From f48fe2a1ff560c86e93f96d59de01e4e5b1a650a Mon Sep 17 00:00:00 2001 From: bhack Date: Sat, 9 May 2026 05:37:55 +0200 Subject: [PATCH] Harden live UI stress checks --- tests/test_mini_eq_atspi_widgets.py | 103 ++++++++- tools/check_live_ui_runtime.py | 312 ++++++++++++++++++++++------ 2 files changed, 354 insertions(+), 61 deletions(-) diff --git a/tests/test_mini_eq_atspi_widgets.py b/tests/test_mini_eq_atspi_widgets.py index 4d2e233..e57089c 100644 --- a/tests/test_mini_eq_atspi_widgets.py +++ b/tests/test_mini_eq_atspi_widgets.py @@ -143,6 +143,26 @@ def find_accessible_with_roles(root, *, name, roles, showing=None): return None +def has_descendant_name(root, name): + for node in iter_accessibles(root): + if node is root: + continue + if accessible_name(node) == name: + return True + return False + + +def find_list_item_with_descendant(root, *, descendant_name, showing=None): + for node in iter_accessibles(root): + if accessible_role(node) != "list item": + continue + if showing is not None and state_contains(node, pyatspi.STATE_SHOWING) != showing: + continue + if has_descendant_name(node, descendant_name): + return node + return None + + def snapshot_frames(root): rows = [] for node in iter_accessibles(root): @@ -152,6 +172,34 @@ def snapshot_frames(root): return rows +def snapshot_showing_controls(root, limit=120): + rows = [] + interesting_roles = { + "combo box", + "label", + "list item", + "menu item", + "push button", + "slider", + "spin button", + "status bar", + "switch", + "text", + "toggle button", + } + for node in iter_accessibles(root): + if not state_contains(node, pyatspi.STATE_SHOWING): + continue + role = accessible_role(node) + name = accessible_name(node) + if role not in interesting_roles or not name: + continue + rows.append((role, name)) + if len(rows) >= limit: + return rows + return rows + + ACCESSIBLE_EVENT = threading.Event() @@ -175,13 +223,14 @@ def stop_accessible_event_loop(event_thread): event_thread.join(timeout=2.0) -def wait_for(description, predicate): - deadline = time.monotonic() + WAIT_TIMEOUT_SECONDS +def wait_for(description, predicate, timeout_seconds=WAIT_TIMEOUT_SECONDS): + deadline = time.monotonic() + timeout_seconds def timeout_error(): desktop = pyatspi.Registry.getDesktop(0) return AssertionError( f"Timed out waiting for {description}; frames: {snapshot_frames(desktop)!r}\n" + f"Showing controls: {snapshot_showing_controls(desktop)!r}\n" f"Mini EQ log:\n{app_log_path.read_text(errors='replace')}\n" f"Shell log:\n{shell_log_path.read_text(errors='replace')}" ) @@ -212,6 +261,10 @@ def sensitive(node): return state_contains(node, pyatspi.STATE_SENSITIVE) +def expanded(node): + return state_contains(node, pyatspi.STATE_EXPANDED) + + def visible_switch_with_state(root, *, name, expected_checked): node = find_accessible(root, name=name, role="switch", showing=True) if node is None or checked(node) != expected_checked: @@ -257,6 +310,49 @@ def toggle_switch(node): run_accessible_action(node, ("toggle",)) +def verify_dropdown_exposes_options(frame, *, combo_name, required_options): + dropdown_timeout = 2.0 + combo = wait_for( + f"{combo_name} combo box", + lambda: find_accessible(frame, name=combo_name, role="combo box", showing=True), + ) + if not sensitive(combo): + raise AssertionError(f"{combo_name} combo box should be sensitive") + + toggle = wait_for( + f"{combo_name} dropdown toggle", + lambda: find_accessible(frame, name=combo_name, role="toggle button", showing=True), + ) + if not sensitive(toggle): + raise AssertionError(f"{combo_name} dropdown toggle should be sensitive") + + activate_control(toggle) + wait_for( + f"{combo_name} combo box to expand", + lambda: expanded(combo), + dropdown_timeout, + ) + try: + for option_name in required_options: + wait_for( + f"{option_name} dropdown list item", + lambda option_name=option_name: find_list_item_with_descendant( + pyatspi.Registry.getDesktop(0), + descendant_name=option_name, + showing=True, + ), + dropdown_timeout, + ) + finally: + activate_control(toggle) + + wait_for( + f"{combo_name} combo box to collapse", + lambda: not expanded(combo), + dropdown_timeout, + ) + + gnome_shell = require_tool("gnome-shell") shell_log = shell_log_path.open("w", encoding="utf-8") app_log = app_log_path.open("w", encoding="utf-8") @@ -304,7 +400,6 @@ def toggle_switch(node): ) atspi_event_thread = start_accessible_event_loop() - time.sleep(1.5) frame = wait_for( "Mini EQ frame", lambda: find_accessible( @@ -341,6 +436,8 @@ def toggle_switch(node): if find_accessible(frame, name="Off", role="status bar", showing=True) is None: raise AssertionError("Monitor Off status is missing") + verify_dropdown_exposes_options(frame, combo_name="Type", required_options=("Notch", "Bell")) + toggle_switch(monitor_switch) wait_for( "Monitor switch to turn on", diff --git a/tools/check_live_ui_runtime.py b/tools/check_live_ui_runtime.py index 49a0e44..2ff4443 100755 --- a/tools/check_live_ui_runtime.py +++ b/tools/check_live_ui_runtime.py @@ -22,11 +22,18 @@ REPO_ROOT = Path(__file__).resolve().parents[1] APP_FRAME_NAME = "Mini EQ" VIRTUAL_SINK_NAME = "mini_eq_sink" +FILTER_OUTPUT_NAME = f"{VIRTUAL_SINK_NAME}_output" SMOKE_APPLICATION_NAME = "mini-eq-live-ui-smoke" SMOKE_NODE_NAME = "mini-eq-live-ui-smoke" ANALYZER_NODE_NAME = "mini-eq-analyzer" PRIMARY_SINK_NAME = "ci_null_sink" ALT_SINK_NAME = "ci_alt_sink" +DEFAULT_AUDIO_SINK_KEY = "default.audio.sink" +DEFAULT_CONFIGURED_AUDIO_SINK_KEY = "default.configured.audio.sink" +DEFAULT_METADATA_VALUE_TYPE = "Spa:String:JSON" +CONTROL_BUS_NAME = "io.github.bhack.mini-eq" +CONTROL_OBJECT_PATH = "/io/github/bhack/mini_eq/Control" +CONTROL_INTERFACE_NAME = "io.github.bhack.MiniEq.Control" WAIT_EVENT_NAMES = ( "window", "object:children-changed", @@ -37,6 +44,9 @@ TARGET_OBJECT_RE = re.compile( r"update: id:(?P\d+) key:'target\.object' value:'(?P[^']*)' type:'(?P[^']*)'" ) +METADATA_VALUE_RE = re.compile( + r"update: id:(?P\d+) key:'(?P[^']*)' value:'(?P[^']*)' type:'(?P[^']*)'" +) def format_command(command: list[str | Path]) -> str: @@ -148,6 +158,99 @@ def metadata_targets() -> dict[int, tuple[str, str]]: return targets +def parse_metadata_node_name(value: str | None) -> str | None: + if not value: + return None + + try: + payload = json.loads(value) + except json.JSONDecodeError: + return value + + if not isinstance(payload, dict): + return None + + name = payload.get("name") + return str(name) if name else None + + +def metadata_value(key: str) -> tuple[str | None, str | None]: + result = subprocess.run( + ["pw-metadata", "-n", "default", "0", key], + check=True, + text=True, + stdout=subprocess.PIPE, + ) + value: tuple[str | None, str | None] = (None, None) + for line in result.stdout.splitlines(): + match = METADATA_VALUE_RE.search(line) + if match is not None and match.group("key") == key: + value = (match.group("value"), match.group("type")) + return value + + +def configured_default_sink_name() -> str | None: + value, _type_name = metadata_value(DEFAULT_CONFIGURED_AUDIO_SINK_KEY) + return parse_metadata_node_name(value) + + +def default_sink_name() -> str | None: + value, _type_name = metadata_value(DEFAULT_AUDIO_SINK_KEY) + return parse_metadata_node_name(value) + + +def set_configured_default_sink_name(sink_name: str, timeout_seconds: float) -> None: + value = json.dumps({"name": sink_name}, separators=(",", ":")) + subprocess.run( + [ + "pw-metadata", + "-n", + "default", + "0", + DEFAULT_CONFIGURED_AUDIO_SINK_KEY, + value, + DEFAULT_METADATA_VALUE_TYPE, + ], + check=True, + text=True, + stdout=subprocess.DEVNULL, + ) + wait_for( + f"PipeWire configured default sink metadata to become {sink_name}", + lambda: configured_default_sink_name() == sink_name, + timeout_seconds, + ) + + +def verify_pipewire_gobject_probe(timeout_seconds: float) -> None: + src_path = str(REPO_ROOT / "src") + if src_path not in sys.path: + sys.path.insert(0, src_path) + + try: + from mini_eq.pipewire_backend import PipeWireBackend + except Exception as exc: + raise RuntimeError(f"failed to import Mini EQ pipewire-gobject backend: {exc}") from exc + + backend = PipeWireBackend(timeout_ms=max(1, int(timeout_seconds * 1000))) + try: + backend.connect() + for sink_name in (PRIMARY_SINK_NAME, ALT_SINK_NAME): + sink = backend.audio_sink_by_name(sink_name) + if sink is None: + raise RuntimeError(f"pipewire-gobject probe did not see sink {sink_name}") + if not sink.object_serial: + raise RuntimeError(f"pipewire-gobject probe saw {sink_name} without object.serial") + + defaults = backend.refresh_defaults() + if defaults.configured_audio_sink != PRIMARY_SINK_NAME: + raise RuntimeError( + f"pipewire-gobject probe read unexpected configured default sink: {defaults.configured_audio_sink!r}" + ) + finally: + backend.close() + + def default_metadata_is_ready() -> bool: result = subprocess.run( ["pw-metadata", "-n", "default"], @@ -160,6 +263,13 @@ def default_metadata_is_ready() -> bool: return result.returncode == 0 +def default_output_metadata_is_ready() -> bool: + if not default_metadata_is_ready(): + return False + + return default_sink_name() in {PRIMARY_SINK_NAME, ALT_SINK_NAME} + + def write_settings(config_dir: Path) -> None: settings_dir = config_dir / "mini-eq" settings_dir.mkdir(parents=True, exist_ok=True) @@ -348,6 +458,26 @@ def find_accessible_with_roles(root, pyatspi, *, name: str, roles: set[str], sho return None +def has_descendant_name(root, name: str) -> bool: + for node in iter_accessibles(root): + if node is root: + continue + if accessible_name(node) == name: + return True + return False + + +def find_list_item_with_descendant(root, pyatspi, *, descendant_name: str, showing: bool | None = None): + for node in iter_accessibles(root): + if accessible_role(node) != "list item": + continue + if showing is not None and state_contains(node, pyatspi.STATE_SHOWING) != showing: + continue + if has_descendant_name(node, descendant_name): + return node + return None + + def snapshot_frames(root, pyatspi) -> list[tuple[str, str, bool]]: rows = [] for node in iter_accessibles(root): @@ -426,12 +556,23 @@ def checked(self, node) -> bool: def sensitive(self, node) -> bool: return state_contains(node, self.pyatspi.STATE_SENSITIVE) + def expanded(self, node) -> bool: + return state_contains(node, self.pyatspi.STATE_EXPANDED) + def find(self, root, *, name: str, role: str | None = None, showing: bool | None = None): return find_accessible(root, self.pyatspi, name=name, role=role, showing=showing) def find_with_roles(self, root, *, name: str, roles: set[str], showing: bool | None = None): return find_accessible_with_roles(root, self.pyatspi, name=name, roles=roles, showing=showing) + def find_list_item_with_descendant(self, root, *, descendant_name: str, showing: bool | None = None): + return find_list_item_with_descendant( + root, + self.pyatspi, + descendant_name=descendant_name, + showing=showing, + ) + def visible_switch_with_state(self, root, *, name: str, expected_checked: bool): node = self.find(root, name=name, role="switch", showing=True) if node is None or self.checked(node) != expected_checked: @@ -510,10 +651,6 @@ def socket_ready() -> bool: wait_for(f"nested Wayland socket {socket_path}", socket_ready, 20.0) -def app_log_contains(app_log_path: Path, text: str) -> bool: - return text in app_log_path.read_text(errors="replace") - - def no_traceback(app_log_path: Path) -> bool: return "Traceback (most recent call last)" not in app_log_path.read_text(errors="replace") @@ -538,6 +675,54 @@ def wait_for_route_away_from_virtual(smoke_id: int, virtual_serial: str, timeout ) +def wait_for_node_target_object(node_name: str, target_serial: str, timeout_seconds: float) -> None: + wait_for( + f"{node_name} target.object metadata to point at {target_serial}", + lambda: ( + (node := node_by_name(node_name)) is not None + and metadata_targets().get(node_id(node)) == (target_serial, "Spa:Id") + ), + timeout_seconds, + ) + + +def unpack_variant(value): + unpack = getattr(value, "unpack", None) + return unpack() if callable(unpack) else value + + +def control_state(timeout_seconds: float) -> dict[str, Any]: + from gi.repository import Gio, GLib + + connection = Gio.bus_get_sync(Gio.BusType.SESSION, None) + result = connection.call_sync( + CONTROL_BUS_NAME, + CONTROL_OBJECT_PATH, + CONTROL_INTERFACE_NAME, + "GetState", + None, + GLib.VariantType.new("(a{sv})"), + Gio.DBusCallFlags.NONE, + max(1000, int(min(timeout_seconds, 5.0) * 1000)), + None, + ) + raw_state = result.unpack()[0] + return {str(key): unpack_variant(value) for key, value in raw_state.items()} + + +def control_output_sink(timeout_seconds: float) -> str: + value = control_state(timeout_seconds).get("output_sink", "") + return str(value) if value is not None else "" + + +def wait_for_control_output_sink(sink_name: str, timeout_seconds: float) -> None: + wait_for( + f"Mini EQ D-Bus output sink to become {sink_name}", + lambda: control_output_sink(timeout_seconds) == sink_name, + timeout_seconds, + ) + + def start_nested_shell(runtime_dir: Path, wayland_name: str, log_path: Path) -> subprocess.Popen[str]: gnome_shell = require_tool("gnome-shell") shell_log = log_path.open("w", encoding="utf-8") @@ -572,7 +757,7 @@ def start_app(repo_root: Path, wayland_name: str, app_log_path: Path) -> subproc env.pop("DISPLAY", None) app_log = app_log_path.open("w", encoding="utf-8") process = subprocess.Popen( - [sys.executable, "-m", "mini_eq", "--auto-route", "--output-sink", PRIMARY_SINK_NAME], + [sys.executable, "-m", "mini_eq", "--auto-route"], cwd=repo_root, env=env, stdout=app_log, @@ -583,37 +768,56 @@ def start_app(repo_root: Path, wayland_name: str, app_log_path: Path) -> subproc return process -def choose_dropdown_option( +def verify_dropdown_exposes_options( driver: UiDriver, frame, *, combo_name: str, - option_name: str, + required_options: tuple[str, ...], timeout_seconds: float, ) -> None: dropdown_timeout = min(timeout_seconds, 5.0) combo = driver.wait_for_accessible( - f"{combo_name} combo", - lambda: driver.find_with_roles( - frame, - name=combo_name, - roles={"combo box", "push button", "toggle button"}, - showing=True, - ), + f"{combo_name} combo box", + lambda: driver.find(frame, name=combo_name, role="combo box", showing=True), dropdown_timeout, ) - driver.activate(combo) - option = driver.wait_for_accessible( - f"{option_name} dropdown option", - lambda: driver.find_with_roles( - driver.desktop(), - name=option_name, - roles={"menu item", "list item", "push button", "toggle button", "label"}, - showing=True, - ), + if not driver.sensitive(combo): + raise AssertionError(f"{combo_name} combo box should be sensitive") + + toggle = driver.wait_for_accessible( + f"{combo_name} dropdown toggle", + lambda: driver.find(frame, name=combo_name, role="toggle button", showing=True), + dropdown_timeout, + ) + if not driver.sensitive(toggle): + raise AssertionError(f"{combo_name} dropdown should be sensitive") + + driver.activate(toggle) + driver.wait_for_accessible( + f"{combo_name} combo box to expand", + lambda: driver.expanded(combo), + dropdown_timeout, + ) + try: + for option_name in required_options: + driver.wait_for_accessible( + f"{option_name} dropdown list item", + lambda option_name=option_name: driver.find_list_item_with_descendant( + driver.desktop(), + descendant_name=option_name, + showing=True, + ), + dropdown_timeout, + ) + finally: + driver.activate(toggle) + + driver.wait_for_accessible( + f"{combo_name} combo box to collapse", + lambda: not driver.expanded(combo), dropdown_timeout, ) - driver.activate(option) def run_ui_flow( @@ -633,9 +837,11 @@ def run_ui_flow( app: subprocess.Popen[str] | None = None smoke: subprocess.Popen[str] | None = None event_thread: threading.Thread | None = None - output_switch_verified = False try: + set_configured_default_sink_name(PRIMARY_SINK_NAME, timeout_seconds) + verify_pipewire_gobject_probe(timeout_seconds) + audio_file = create_sine_wav(tmp_dir / "mini-eq-live-ui-smoke.wav", audio_duration) smoke = start_smoke_stream(audio_file) smoke_node = wait_for( @@ -676,6 +882,14 @@ def run_ui_flow( if not driver.sensitive(compare_switch): raise AssertionError("Compare switch should become sensitive when routing is active") + verify_dropdown_exposes_options( + driver, + frame, + combo_name="Type", + required_options=("Notch", "Bell"), + timeout_seconds=timeout_seconds, + ) + route_switch = driver.wait_for_accessible( "System-wide EQ switch to start active", lambda: driver.visible_switch_with_state(frame, name="System-wide EQ", expected_checked=True), @@ -690,6 +904,7 @@ def run_ui_flow( virtual_sink = wait_for_sink(VIRTUAL_SINK_NAME, timeout_seconds) virtual_serial = object_serial(virtual_sink) wait_for_route_to_virtual(smoke_id, virtual_serial, timeout_seconds) + wait_for_control_output_sink(PRIMARY_SINK_NAME, timeout_seconds) for cycle in range(cycles): print(f"## route toggle cycle {cycle + 1}/{cycles}", flush=True) @@ -709,37 +924,18 @@ def run_ui_flow( ) wait_for_route_to_virtual(smoke_id, virtual_serial, timeout_seconds) - try: - choose_dropdown_option( - driver, - frame, - combo_name="EQ output", - option_name="CI Alt Sink", - timeout_seconds=timeout_seconds, - ) - wait_for( - "Mini EQ to retarget the alternate output", - lambda: app_log_contains(app_log_path, f"-> {ALT_SINK_NAME}"), - timeout_seconds, - ) - wait_for_route_to_virtual(smoke_id, virtual_serial, timeout_seconds) + alt_sink = wait_for_sink(ALT_SINK_NAME, timeout_seconds) + alt_serial = object_serial(alt_sink) + set_configured_default_sink_name(ALT_SINK_NAME, timeout_seconds) + wait_for_control_output_sink(ALT_SINK_NAME, timeout_seconds) + wait_for_node_target_object(FILTER_OUTPUT_NAME, alt_serial, timeout_seconds) + wait_for_route_to_virtual(smoke_id, virtual_serial, timeout_seconds) - choose_dropdown_option( - driver, - frame, - combo_name="EQ output", - option_name="CI Null Sink", - timeout_seconds=timeout_seconds, - ) - wait_for( - "Mini EQ to retarget the primary output", - lambda: app_log_contains(app_log_path, f"-> {PRIMARY_SINK_NAME}"), - timeout_seconds, - ) - wait_for_route_to_virtual(smoke_id, virtual_serial, timeout_seconds) - output_switch_verified = True - except AssertionError as exc: - print(f"Output dropdown switch was not accessible in this run: {str(exc).splitlines()[0]}", flush=True) + primary_serial = object_serial(wait_for_sink(PRIMARY_SINK_NAME, timeout_seconds)) + set_configured_default_sink_name(PRIMARY_SINK_NAME, timeout_seconds) + wait_for_control_output_sink(PRIMARY_SINK_NAME, timeout_seconds) + wait_for_node_target_object(FILTER_OUTPUT_NAME, primary_serial, timeout_seconds) + wait_for_route_to_virtual(smoke_id, virtual_serial, timeout_seconds) if driver.checked(monitor_switch): driver.toggle_switch(monitor_switch) @@ -830,9 +1026,9 @@ def run_ui_flow( if not no_traceback(app_log_path): raise AssertionError(f"Mini EQ logged a traceback on shutdown:\n{app_log_path.read_text(errors='replace')}") - output_detail = "output retarget verified" if output_switch_verified else "output retarget skipped" print( - f"Live UI runtime smoke passed: AT-SPI UI flow, synthetic stream routing, monitor, {output_detail}, and shutdown verified." + "Live UI runtime smoke passed: AT-SPI UI flow, dropdown options, " + "pipewire-gobject probe, synthetic stream routing, default-output follow, monitor, and shutdown verified." ) finally: stop_accessible_event_loop(pyatspi, event_thread) @@ -892,7 +1088,7 @@ def run_helper(_args: argparse.Namespace) -> int: pipewire, wireplumber = start_pipewire_processes(tmp_dir) wait_for_sink(PRIMARY_SINK_NAME, timeout_seconds) wait_for_sink(ALT_SINK_NAME, timeout_seconds) - wait_for("WirePlumber default metadata", default_metadata_is_ready, timeout_seconds) + wait_for("WirePlumber default output metadata", default_output_metadata_is_ready, timeout_seconds) run_ui_flow( pyatspi=pyatspi, repo_root=REPO_ROOT,