diff --git a/src/butter_backup/cli.py b/src/butter_backup/cli.py index 3394644..9c6e73e 100644 --- a/src/butter_backup/cli.py +++ b/src/butter_backup/cli.py @@ -16,6 +16,8 @@ from . import backup_backends as bb from . import config_parser as cp from .device_managers import ( + decrypted_device, + open_encrypted_device, prepare_device_for_butterbackend, prepare_device_for_resticbackend, ) @@ -132,7 +134,7 @@ def open( # noqa: A001 ): continue mount_dir = Path(mkdtemp()) - decrypted = sdm.open_encrypted_device(cfg.device(), cfg.DevicePassCmd) + decrypted = open_encrypted_device(cfg.device(), cfg.DevicePassCmd) match cfg: case cp.BtrFSRsyncConfig(): sdm.mount_btrfs_device( @@ -206,7 +208,7 @@ def backup(config: Path = CONFIG_OPTION, verbose: int = VERBOSITY_OPTION) -> Non ): continue backend = bb.BackupBackend.from_config(cfg) - with sdm.decrypted_device(cfg.device(), cfg.DevicePassCmd) as decrypted: + with decrypted_device(cfg.device(), cfg.DevicePassCmd) as decrypted: match cfg: case cp.BtrFSRsyncConfig(): compression = cfg.Compression diff --git a/src/butter_backup/device_managers.py b/src/butter_backup/device_managers.py index 964e716..d9d3623 100644 --- a/src/butter_backup/device_managers.py +++ b/src/butter_backup/device_managers.py @@ -1,3 +1,5 @@ +import contextlib +import subprocess import typing as t from datetime import date from pathlib import Path @@ -7,6 +9,49 @@ from . import config_parser as cp + +class DevicePassCmdError(RuntimeError): + """Raised when DevicePassCmd fails to execute successfully.""" + + +def open_encrypted_device(device: Path, pass_cmd: str) -> Path: + """Open an encrypted device, distinguishing pass_cmd failures from decryption failures. + + Unlike sdm.open_encrypted_device, this function raises DevicePassCmdError + when the password command itself fails, and DeviceDecryptionError only when + the actual decryption (cryptsetup) fails. + """ + map_name = device.name + decrypt_cmd = ["sudo", "cryptsetup", "open", str(device), map_name] + try: + pwd_proc = subprocess.run( + pass_cmd, stdout=subprocess.PIPE, shell=True, check=True + ) + except subprocess.CalledProcessError as e: + raise DevicePassCmdError( + f"DevicePassCmd '{pass_cmd}' failed to execute." + ) from e + try: + subprocess.run(decrypt_cmd, input=pwd_proc.stdout, check=True) + except subprocess.CalledProcessError as e: + raise sdm.DeviceDecryptionError() from e + return Path("/dev/mapper/") / map_name + + +@contextlib.contextmanager +def decrypted_device(device: Path, pass_cmd: str) -> t.Iterator[Path]: + """Context manager that opens and closes an encrypted device. + + Like sdm.decrypted_device, but distinguishes DevicePassCmdError from + DeviceDecryptionError. + """ + decrypted = open_encrypted_device(device, pass_cmd) + try: + yield decrypted + finally: + sdm.close_decrypted_device(decrypted) + + ValidFileSystems = t.Literal["ext4", "btrfs"] diff --git a/tests/test_device_managers.py b/tests/test_device_managers.py new file mode 100644 index 0000000..6bf167f --- /dev/null +++ b/tests/test_device_managers.py @@ -0,0 +1,53 @@ +from pathlib import Path + +import pytest +import storage_device_managers as sdm + +from butter_backup import device_managers as dm + + +def in_docker_container() -> bool: + return Path("/.dockerenv").exists() + + +@pytest.mark.skipif( + in_docker_container(), reason="Test is known to fail in Docker container" +) +def test_open_encrypted_device_raises_device_pass_cmd_error_on_failing_pass_cmd( + encrypted_device, +) -> None: + """When DevicePassCmd fails, DevicePassCmdError must be raised, not DeviceDecryptionError.""" + device = encrypted_device.device() + failing_pass_cmd = "false" # always exits with non-zero status + + with pytest.raises(dm.DevicePassCmdError): + dm.open_encrypted_device(device, failing_pass_cmd) + + +@pytest.mark.skipif( + in_docker_container(), reason="Test is known to fail in Docker container" +) +def test_decrypted_device_raises_device_pass_cmd_error_on_failing_pass_cmd( + encrypted_device, +) -> None: + """When DevicePassCmd fails, DevicePassCmdError must be raised, not DeviceDecryptionError.""" + device = encrypted_device.device() + failing_pass_cmd = "false" # always exits with non-zero status + + with pytest.raises(dm.DevicePassCmdError): + with dm.decrypted_device(device, failing_pass_cmd): + pass # pragma: no cover + + +@pytest.mark.skipif( + in_docker_container(), reason="Test is known to fail in Docker container" +) +def test_open_encrypted_device_raises_device_decryption_error_on_wrong_password( + encrypted_device, +) -> None: + """When DevicePassCmd succeeds but the password is wrong, DeviceDecryptionError must be raised.""" + device = encrypted_device.device() + wrong_password_cmd = "echo wrong_password" + + with pytest.raises(sdm.DeviceDecryptionError): + dm.open_encrypted_device(device, wrong_password_cmd)