Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 139 additions & 1 deletion packages/helpermodules/mosquitto_dynsec.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from json import load as json_load
import json
import logging
from pathlib import Path
from typing import TypedDict, Optional
import re
from typing import Tuple, TypedDict, Optional

from helpermodules.subdata import SubData
from helpermodules.utils.run_command import run_command
Expand All @@ -15,6 +17,14 @@ class MosquittoAcl(TypedDict):
allow: bool
priority: int

def __eq__(self, other):
if self["acltype"] == other["acltype"]:
if re.sub(r'/\d+/', '/<id>/', self["topic"]) == re.sub(r'/\d+/', '/<id>/', other["topic"]):
if self["allow"] == other["allow"]:
if self["priority"] == other["priority"]:
return True
return False


class MosquittoRole(TypedDict):
rolename: str
Expand Down Expand Up @@ -83,6 +93,7 @@ def remove_acl_role(role_template: str, id: int):


def check_roles_at_start():
update_acls()
flag_path = Path(Path(__file__).resolve().parents[2]/"ramdisk"/"init_user_management")
if flag_path.is_file():
with open(flag_path, "r") as file:
Expand All @@ -104,3 +115,130 @@ def check_roles_at_start():
if "io" in key:
add_acl_role("io-device-<id>-access", value.config.id)
flag_path.unlink()


def _extract_id_from_role_name(role_name: str) -> Optional[int]:
numbers = re.findall(r'\d+', role_name)
if numbers:
return int(numbers[0])
return None


def _get_configured_role_data(role_name: str) -> Optional[MosquittoRole]:
role_output = run_command([
"mosquitto_ctrl", "dynsec", "getRole", role_name])
# Parse the text output since it's not JSON
role_data = {"rolename": role_name, "acls": []}
lines = role_output.strip().split('\n')
for line in lines[1:]: # Skip first line with rolename
if "ACLs:" in line:
line = line.replace("ACLs:", "")
if line.strip() and ':' in line:
parts = [p.strip() for p in line.split(':')]
if len(parts) >= 4:
acl = {
"acltype": parts[0],
"allow": parts[1] == "allow",
"topic": parts[2].split('(')[0].strip(),
"priority": int(parts[3].replace(')', '').strip())
}
role_data["acls"].append(acl)
return role_data


VERSION_STRING = "openwb-version:"


def update_acls():
def get_acl_versions() -> Tuple[str, str]:
for role in roles:
if VERSION_STRING in role:
current_version = role.split(":")[1]
break
else:
raise RuntimeError("openwb-version role not found")

for role in dynsec_config["roles"]:
try:
if VERSION_STRING in role["rolename"]:
template_version = role["rolename"].split(":")[1]
break
except Exception:
continue
else:
raise RuntimeError("openwb-version role not found in default-dynamic-security.json")

if current_version != template_version:
log.debug(f"Current ACL version: '{current_version}' Template version: '{template_version}'")
return current_version, template_version

def get_template_role_data() -> Optional[MosquittoRole]:
for config_role in dynsec_config["roles"]:
if (config_role["rolename"] == role_data["rolename"] or
(VERSION_STRING in config_role["rolename"] and VERSION_STRING in role_data["rolename"])):
return config_role
for config_role in role_templates_config:
pattern = config_role["rolename"].replace("<id>", r"\d+")
if re.match(pattern, role_data["rolename"]):
return config_role
raise RuntimeError(f"Role {role_data['rolename']} not found in default-dynamic-security.json")

try:
roles = _list_acl_roles()
with open(_get_packages_path()/"data/config/mosquitto/public/default-dynamic-security.json", "r") as f:
dynsec_config = json.load(f)
current_version, template_version = get_acl_versions()
if current_version != template_version:
with open(_get_packages_path()/"data/config/mosquitto/public/role-templates.json", "r") as f:
role_templates_config = json.load(f)
for role_name in roles:
try:
role_data = _get_configured_role_data(role_name)
template_role_data = get_template_role_data()
# entferne Rollen, die in der Konfigurationsdatei nicht vorhanden sind
if template_role_data is None:
log.debug(f"Rolle '{role_data['rolename']}' existiert nicht in den Konfigurationsdateien und"
" wird gelöscht.")
run_command(["mosquitto_ctrl", "dynsec", "deleteRole", role_data["rolename"]])

# entferne ACLs aus der Rolle, wenn diese im Template nicht vorhanden sind
for acl in role_data["acls"]:
for template_acl in template_role_data["acls"]:
if template_acl == acl:
break
else:
log.debug(f"ACL {acl['acltype']}:{'allow' if acl['allow'] else 'deny'}:{acl['topic']}:"
f"{acl['priority']} in Rolle {role_data['rolename']} wird entfernt.")
run_command([
"mosquitto_ctrl", "dynsec", "removeRoleAcl", role_data["rolename"],
acl["acltype"], acl["topic"]
])
# ergänze zusätzliche ACLs aus dem Template in der Rollen
for acl in template_role_data["acls"]:
for role_acl in role_data["acls"]:
if acl == role_acl:
break
else:
rolename_id = _extract_id_from_role_name(role_data["rolename"])
if rolename_id is not None:
acl["topic"] = acl["topic"].replace("<id>", str(rolename_id))
log.debug(f"ACL {acl['acltype']}:{'allow' if acl['allow'] else 'deny'}:{acl['topic']}:"
f"{acl['priority']} in Rolle {role_data['rolename']} wird hinzugefügt.")
run_command([
"mosquitto_ctrl", "dynsec", "addRoleAcl", role_data["rolename"],
acl["acltype"], acl["topic"],
"allow" if acl["allow"] else "deny",
str(acl["priority"])
])
except Exception:
log.exception(f"Fehler beim Aktualisieren der Rolle '{role_name}'")
# bei allen anderen Rollen dürfen nur die ACLs editiert werden,
# damit diese in den Benutzergruppen erhalten bleiben
run_command(["mosquitto_ctrl", "dynsec", "deleteRole", f"{VERSION_STRING}{current_version}"])
run_command(["mosquitto_ctrl", "dynsec", "createRole", f"{VERSION_STRING}{template_version}"])
except Exception:
log.exception("Fehler beim Aktualisieren der ACLs")


def _get_packages_path() -> Path:
return Path(__file__).resolve().parents[2]