diff --git a/packages/helpermodules/mosquitto_dynsec.py b/packages/helpermodules/mosquitto_dynsec.py index fa92d221ef..a0db2a2a9e 100644 --- a/packages/helpermodules/mosquitto_dynsec.py +++ b/packages/helpermodules/mosquitto_dynsec.py @@ -1,7 +1,9 @@ from json import load as json_load +import json import logging from pathlib import Path -from typing import TypedDict, Optional +import re +from typing import Tuple, TypedDict, Optional from helpermodules.subdata import SubData from helpermodules.utils.run_command import run_command @@ -15,6 +17,14 @@ class MosquittoAcl(TypedDict): allow: bool priority: int + def __eq__(self, other): + if self["acltype"] == other["acltype"]: + if re.sub(r'/\d+/', '//', self["topic"]) == re.sub(r'/\d+/', '//', other["topic"]): + if self["allow"] == other["allow"]: + if self["priority"] == other["priority"]: + return True + return False + class MosquittoRole(TypedDict): rolename: str @@ -83,6 +93,7 @@ def remove_acl_role(role_template: str, id: int): def check_roles_at_start(): + update_acls() flag_path = Path(Path(__file__).resolve().parents[2]/"ramdisk"/"init_user_management") if flag_path.is_file(): with open(flag_path, "r") as file: @@ -104,3 +115,130 @@ def check_roles_at_start(): if "io" in key: add_acl_role("io-device--access", value.config.id) flag_path.unlink() + + +def _extract_id_from_role_name(role_name: str) -> Optional[int]: + numbers = re.findall(r'\d+', role_name) + if numbers: + return int(numbers[0]) + return None + + +def _get_configured_role_data(role_name: str) -> Optional[MosquittoRole]: + role_output = run_command([ + "mosquitto_ctrl", "dynsec", "getRole", role_name]) + # Parse the text output since it's not JSON + role_data = {"rolename": role_name, "acls": []} + lines = role_output.strip().split('\n') + for line in lines[1:]: # Skip first line with rolename + if "ACLs:" in line: + line = line.replace("ACLs:", "") + if line.strip() and ':' in line: + parts = [p.strip() for p in line.split(':')] + if len(parts) >= 4: + acl = { + "acltype": parts[0], + "allow": parts[1] == "allow", + "topic": parts[2].split('(')[0].strip(), + "priority": int(parts[3].replace(')', '').strip()) + } + role_data["acls"].append(acl) + return role_data + + +VERSION_STRING = "openwb-version:" + + +def update_acls(): + def get_acl_versions() -> Tuple[str, str]: + for role in roles: + if VERSION_STRING in role: + current_version = role.split(":")[1] + break + else: + raise RuntimeError("openwb-version role not found") + + for role in dynsec_config["roles"]: + try: + if VERSION_STRING in role["rolename"]: + template_version = role["rolename"].split(":")[1] + break + except Exception: + continue + else: + raise RuntimeError("openwb-version role not found in default-dynamic-security.json") + + if current_version != template_version: + log.debug(f"Current ACL version: '{current_version}' Template version: '{template_version}'") + return current_version, template_version + + def get_template_role_data() -> Optional[MosquittoRole]: + for config_role in dynsec_config["roles"]: + if (config_role["rolename"] == role_data["rolename"] or + (VERSION_STRING in config_role["rolename"] and VERSION_STRING in role_data["rolename"])): + return config_role + for config_role in role_templates_config: + pattern = config_role["rolename"].replace("", r"\d+") + if re.match(pattern, role_data["rolename"]): + return config_role + raise RuntimeError(f"Role {role_data['rolename']} not found in default-dynamic-security.json") + + try: + roles = _list_acl_roles() + with open(_get_packages_path()/"data/config/mosquitto/public/default-dynamic-security.json", "r") as f: + dynsec_config = json.load(f) + current_version, template_version = get_acl_versions() + if current_version != template_version: + with open(_get_packages_path()/"data/config/mosquitto/public/role-templates.json", "r") as f: + role_templates_config = json.load(f) + for role_name in roles: + try: + role_data = _get_configured_role_data(role_name) + template_role_data = get_template_role_data() + # entferne Rollen, die in der Konfigurationsdatei nicht vorhanden sind + if template_role_data is None: + log.debug(f"Rolle '{role_data['rolename']}' existiert nicht in den Konfigurationsdateien und" + " wird gelöscht.") + run_command(["mosquitto_ctrl", "dynsec", "deleteRole", role_data["rolename"]]) + + # entferne ACLs aus der Rolle, wenn diese im Template nicht vorhanden sind + for acl in role_data["acls"]: + for template_acl in template_role_data["acls"]: + if template_acl == acl: + break + else: + log.debug(f"ACL {acl['acltype']}:{'allow' if acl['allow'] else 'deny'}:{acl['topic']}:" + f"{acl['priority']} in Rolle {role_data['rolename']} wird entfernt.") + run_command([ + "mosquitto_ctrl", "dynsec", "removeRoleAcl", role_data["rolename"], + acl["acltype"], acl["topic"] + ]) + # ergänze zusätzliche ACLs aus dem Template in der Rollen + for acl in template_role_data["acls"]: + for role_acl in role_data["acls"]: + if acl == role_acl: + break + else: + rolename_id = _extract_id_from_role_name(role_data["rolename"]) + if rolename_id is not None: + acl["topic"] = acl["topic"].replace("", str(rolename_id)) + log.debug(f"ACL {acl['acltype']}:{'allow' if acl['allow'] else 'deny'}:{acl['topic']}:" + f"{acl['priority']} in Rolle {role_data['rolename']} wird hinzugefügt.") + run_command([ + "mosquitto_ctrl", "dynsec", "addRoleAcl", role_data["rolename"], + acl["acltype"], acl["topic"], + "allow" if acl["allow"] else "deny", + str(acl["priority"]) + ]) + except Exception: + log.exception(f"Fehler beim Aktualisieren der Rolle '{role_name}'") + # bei allen anderen Rollen dürfen nur die ACLs editiert werden, + # damit diese in den Benutzergruppen erhalten bleiben + run_command(["mosquitto_ctrl", "dynsec", "deleteRole", f"{VERSION_STRING}{current_version}"]) + run_command(["mosquitto_ctrl", "dynsec", "createRole", f"{VERSION_STRING}{template_version}"]) + except Exception: + log.exception("Fehler beim Aktualisieren der ACLs") + + +def _get_packages_path() -> Path: + return Path(__file__).resolve().parents[2]