diff --git a/docs/extension-handlers.md b/docs/extension-handlers.md index 15fd7ac..6cd92e4 100644 --- a/docs/extension-handlers.md +++ b/docs/extension-handlers.md @@ -655,75 +655,5 @@ end-to-end example. ## Python wrapper -The `sandlock.handler` module provides a Python-side wrapper on top of -the C ABI. See `python/tests/test_handler_smoke.py` for working -examples. - -### Minimal example - -```python -import sandlock -from sandlock.handler import ExceptionPolicy, Handler, NotifAction - -class AuditOpens(Handler): - on_exception = ExceptionPolicy.CONTINUE # audit-only — never block - - def handle(self, ctx): - path = ctx.read_cstr(ctx.args[1], max_len=4096) - print(f"opening {path!r}") - return NotifAction.continue_() - -sb = sandlock.Sandbox(fs_readable=["/usr", "/etc", "/lib", "/lib64", "/bin"]) -sb.run_with_handlers( - cmd=["/usr/bin/cat", "/etc/hostname"], - handlers=[("openat", AuditOpens())], -) -``` - -Each handler is registered for one syscall. The key is a syscall name -(`str`, e.g. `"openat"`), resolved for the host architecture, or a raw -kernel syscall number (`int`). Prefer the name — raw numbers are -architecture-specific (`openat` is 257 on x86_64 but 56 on aarch64). A -name sandlock cannot resolve raises `ValueError`; syscalls sandlock does -not filter (e.g. `getpid`) are not name-resolvable and must be passed as -an `int`. C callers can resolve a name with `sandlock_syscall_nr`. - -### Threading & safety contract - -- **GIL contention.** Each handler dispatch holds the GIL for the - duration of `handle()`. The supervisor may dispatch handler - callbacks concurrently across different notifications, so design - `handle()` to be fast (sub-millisecond) and to protect any mutable - handler state with your own synchronization. High-frequency - interception (e.g. per-`SYS_openat` audit on a busy workload) will - serialize on the GIL and can stall the supervisor. - -- **Interpreter finalization.** If `Py_FinalizeEx` runs while the - sandbox is still alive (e.g. the main thread exits with handlers - still registered), the trampoline checks `Py_IsInitialized()` and - returns an error, routing the notification through the handler's - `on_exception` policy. Do not rely on this for clean shutdown — wait - for the run to finish before tearing down the interpreter. - -- **Native crashes inside `handle()`.** A segfault inside a Python - handler is not recoverable: the supervisor task hangs and the - trapped child is held indefinitely. Write defensive handlers; this - is a user responsibility. - -- **Tokio runtime reentrancy.** The C ABI's `sandlock_run_with_handlers` - builds and drives its own Tokio runtime internally. Do not call - `Sandbox.run_with_handlers` from a thread that already runs a Tokio - runtime — the FFI will panic, and the panic surfaces as a Python - exception. Pure-Python use (the common case) is unaffected. - -### Ownership rules - -- **Handler instances** must outlive the run. The Sandbox holds a - strong reference for the duration of the run; the reference is - released when the run completes (success or failure). - -- **File descriptors** passed via `NotifAction.inject_fd_send(srcfd)` - transfer ownership to the supervisor on dispatch. The Python caller - must NOT close `srcfd` afterwards, regardless of whether the action - was actually dispatched — the supervisor handles cleanup on all - paths. +See [`python-handlers.md`](python-handlers.md) — the dedicated page is the +single source of truth for the Python wrapper. diff --git a/docs/python-handlers.md b/docs/python-handlers.md new file mode 100644 index 0000000..c53cdb2 --- /dev/null +++ b/docs/python-handlers.md @@ -0,0 +1,279 @@ +# Python handlers + +The `sandlock.handler` Python wrapper layers a pythonic API over the C ABI +exposed by `sandlock-ffi`. This page is the single source of truth for the +Python wrapper; for the underlying C/Rust contract see +[`extension-handlers.md`](extension-handlers.md). + +## Quick start + +```python +import sandlock +from sandlock.presets import AuditPathsHandler, COMMON_PATH_SYSCALLS + +audit = AuditPathsHandler(callback=lambda path, _ctx: print(f"open {path}")) +sb = sandlock.Sandbox(fs_readable=["/usr", "/etc", "/lib", "/bin"]) +sb.run_with_handlers( + cmd=["/usr/bin/cat", "/etc/hostname"], + handlers=[(s, audit) for s in COMMON_PATH_SYSCALLS], +) +``` + +## Core types + +- `Handler` — base class. Subclass and override `handle(ctx) -> NotifAction`. + Set the class attribute `on_exception` (default `ExceptionPolicy.KILL`) to + choose what the supervisor does when the handler errors. +- `HandlerCtx` — frozen dataclass with the notification fields (`id`, `pid`, + `flags`, `syscall_nr`, `arch`, `instruction_pointer`, `args`) plus + child-memory accessors. +- `NotifAction` — frozen value-object. Construct via factory classmethods: + `continue_()`, `errno(value)`, `returns(value)`, `hold()`, + `kill(sig, pgid)`, `inject_fd_send(srcfd, newfd_flags)`. +- `ExceptionPolicy` — IntEnum: `KILL` (default), `DENY_EPERM`, `CONTINUE`, + `DENY_EIO`. + +## HandlerCtx accessors + +### `read_cstr(addr, max_len) -> str | None` + +Read a NUL-terminated string from the child at `addr`. Returns the decoded +string on success, `None` on failure (invalid address, race, or no live +mem handle). + +### `read(addr, length) -> bytes | None` + +Read `length` raw bytes. Returns the bytes on success, `None` on failure. + +### `write(addr, data) -> bool` + +Write `data` into the child memory at `addr`. Returns `True` on success. + +### `read_path(arg=None, max_len=4096) -> str | None` + +Resolve a path-bearing syscall argument to a Python string. With +`arg=None` (default), the path-argument index is inferred from +`ctx.syscall_nr` via a name-keyed table. Multi-path syscalls +(`renameat2`, `rename`, `linkat`, `link`, `symlinkat`, `symlink`) and +unknown syscalls raise `ValueError` — pass `arg=` explicitly in those +cases. + +Known single-path syscalls (auto-inferred): + +| Syscall | path arg | +|---|---| +| `openat`, `unlinkat`, `mkdirat`, `newfstatat`, `statx`, `faccessat`, `readlinkat`, `execveat` | 1 | +| `open`, `unlink`, `mkdir`, `rmdir`, `stat`, `lstat`, `access`, `readlink`, `execve` | 0 | + +Multi-path syscalls — call twice with explicit `arg=`: + +```python +def handle(self, ctx): + # renameat2(olddirfd, oldpath, newdirfd, newpath, flags) + src = ctx.read_path(arg=1) + dst = ctx.read_path(arg=3) + return NotifAction.continue_() +``` + +A live `HandlerCtx` returns the decoded string; a stale or absent mem +handle returns `None`. + +## Preset handlers + +Imported from `sandlock.presets`. The preset classes are deliberately +NOT re-exported from the root `sandlock` package — root surface stays +minimal; callers reach for presets when they want them. (The core +handler types — `Handler`, `NotifAction`, `HandlerCtx`, +`ExceptionPolicy` — *are* re-exported at the root.) + +### `COMMON_PATH_SYSCALLS` + +The set of modern path-bearing syscalls a generic file-operation handler +is typically registered against: + +```python +COMMON_PATH_SYSCALLS = [ + "openat", "unlinkat", "newfstatat", "statx", "faccessat", + "readlinkat", "mkdirat", "execveat", "execve", +] +``` + +Used with a list comprehension to register one handler on all of them: + +```python +handlers = [(s, my_handler) for s in COMMON_PATH_SYSCALLS] +``` + +### `AuditPathsHandler(callback, max_len=4096)` + +- `on_exception=CONTINUE` — audit must never block. +- Calls `callback(path, ctx)` on every intercepted syscall (including + when `path is None`, so the caller sees "couldn't read"). +- Returns `NotifAction.continue_()`. + +### `PathDenyHandler(deny: list[str], errno=errno.EPERM, max_len=4096)` + +- `on_exception=KILL` — security handler, fail-closed. +- `deny` is a `list[str]` of `fnmatch` patterns; passing a single string + raises `TypeError` (the API is uniform). +- If `path` matches any pattern, returns `NotifAction.errno(errno)`; + otherwise returns `NotifAction.continue_()`. +- **`path is None` → `continue_()`.** A deny-list does not claim + "everything else is allowed", so when the path cannot be classified we + defer to Landlock and other handlers in the chain. + +### `PathAllowHandler(allow: list[str], errno=errno.EACCES, max_len=4096)` + +- `on_exception=KILL` — security handler, fail-closed. +- `allow` is a `list[str]` of `fnmatch` patterns; passing a single string + raises `TypeError`. +- If `path` matches any pattern, returns `NotifAction.continue_()`; + otherwise returns `NotifAction.errno(errno)`. +- **`path is None` → `errno(errno)`.** An allow-list claims "everything + except the listed paths is denied", so failing to verify means failing + closed. + +### `LogSyscallsHandler(logger=None)` + +- `on_exception=CONTINUE` — observational. +- Logs one line per intercepted syscall: + `syscall=N pid=P args=(a0, a1, a2, a3, a4, a5)`. +- Default `logger` is `logging.getLogger("sandlock.audit").info`. Any + `Callable[[str], None]` works (e.g. `list.append` in tests). +- If `logger` raises, the exception is absorbed by `on_exception=CONTINUE` + — the child proceeds but the log line is silently lost. + +## Recipes + +### Audit every common path syscall + +```python +audit = AuditPathsHandler(callback=lambda path, _ctx: print(path)) +sb.run_with_handlers(cmd, [(s, audit) for s in COMMON_PATH_SYSCALLS]) +``` + +### Deny a directory tree + +```python +deny = PathDenyHandler(deny=["/etc/*", "/var/lib/*"]) +sb.run_with_handlers(cmd, [(s, deny) for s in COMMON_PATH_SYSCALLS]) +``` + +### Allow-list paths (fail-closed) + +```python +allow = PathAllowHandler(allow=["/tmp/sandbox/*", "/usr/lib/*"]) +sb.run_with_handlers(cmd, [(s, allow) for s in COMMON_PATH_SYSCALLS]) +``` + +Anything not under those prefixes returns `EACCES`. Any syscall whose +path the handler cannot read also returns `EACCES` (fail-closed). + +### Synthesise a return value + +```python +from sandlock.handler import Handler, NotifAction, ExceptionPolicy + +class FakePid(Handler): + on_exception = ExceptionPolicy.KILL + + def handle(self, ctx): + return NotifAction.returns(777) + +sb.run_with_handlers(cmd, [("getpid", FakePid())]) +``` + +### Kill the child from a handler + +```python +import signal +from sandlock.handler import Handler, NotifAction + +class KillOnEtc(Handler): + def handle(self, ctx): + path = ctx.read_path() + if path and path.startswith("/etc/"): + return NotifAction.kill(signal.SIGKILL, pgid=0) + return NotifAction.continue_() +``` + +**Caveat:** `ctx.read_path()` without an explicit `arg=` raises +`ValueError` for syscalls not in the known path table (see the +`read_path` accessor section). Under the default +`on_exception=KILL` policy that `ValueError` becomes a kill signal +to the child. Either register the handler only against syscalls in +`COMMON_PATH_SYSCALLS`, pass `arg=` explicitly, or set +`on_exception=ExceptionPolicy.CONTINUE` on your handler. + +### Combine multiple handlers on one syscall + +Register multiple handlers on the same syscall — the supervisor calls +them in registration order, stopping at the first non-`Continue` action: + +```python +sb.run_with_handlers(cmd, [ + ("openat", audit), + ("openat", deny), +]) +``` + +On syscalls that already carry built-in handlers (`openat` for chroot +path normalization, COW write redirection, procfs virtualization; +`clone`/`fork`/`execve` for resource accounting; and others), user +handlers are appended **after** all builtins — see +`build_dispatch_table` in +`crates/sandlock-core/src/seccomp/dispatch.rs`. Dispatch short-circuits +on the first non-`Continue` action, so a user handler only fires if +every built-in for that syscall first returned `Continue`. Built-ins +cannot be overridden or removed; this is the security boundary. When +testing security-critical user handlers (e.g. `PathDenyHandler` on +`openat`), exercise them against the actual built-in set on your +syscall list rather than against an empty dispatch table. + +## Threading & safety contract + +- **GIL contention.** Each handler dispatch holds the GIL for the + duration of `handle()`. The supervisor may dispatch handler + callbacks concurrently across different notifications, so design + `handle()` to be fast (sub-millisecond) and to protect any mutable + handler state with your own synchronization. High-frequency + interception (e.g. per-`SYS_openat` audit on a busy workload) will + serialize on the GIL and can stall the supervisor. + +- **Interpreter finalization.** If `Py_FinalizeEx` runs while the + sandbox is still alive (e.g. the main thread exits with handlers + still registered), the trampoline checks `Py_IsInitialized()` and + returns an error, routing the notification through the handler's + `on_exception` policy. Do not rely on this for clean shutdown — wait + for the run to finish before tearing down the interpreter. + +- **Native crashes inside `handle()`.** A segfault inside a Python + handler is not recoverable: the supervisor task hangs and the + trapped child is held indefinitely. Write defensive handlers; this + is a user responsibility. + +- **Tokio runtime reentrancy.** The C ABI's `sandlock_run_with_handlers` + builds and drives its own Tokio runtime internally. Do not call + `Sandbox.run_with_handlers` from a thread that already runs a Tokio + runtime — the FFI will panic, and the panic surfaces as a Python + exception. Pure-Python use (the common case) is unaffected. + +## Ownership rules + +- **Handler instances** must outlive the run. The Sandbox holds a + strong reference for the duration of the run; the reference is + released when the run completes (success or failure). + +- **File descriptors** passed via `NotifAction.inject_fd_send(srcfd)` + transfer ownership to the supervisor on dispatch. The Python caller + must NOT close `srcfd` afterwards, regardless of whether the action + was actually dispatched — the supervisor handles cleanup on all + paths. + +## C ABI + +The Python wrapper sits on the C ABI declared in +`crates/sandlock-ffi/include/sandlock.h`. For the C ABI contract, +exception policies at the supervisor level, and ownership across the +boundary see the C/Rust sections of +[`extension-handlers.md`](extension-handlers.md). diff --git a/python/src/sandlock/handler.py b/python/src/sandlock/handler.py index 3f76281..60ee6f5 100644 --- a/python/src/sandlock/handler.py +++ b/python/src/sandlock/handler.py @@ -38,7 +38,7 @@ class NotifAction: - CONTINUE: no payload fields used. - ERRNO: ``errno_value`` set. - - RETURN_VALUE: ``return_value`` set (factory: ``return_value_``). + - RETURN_VALUE: ``return_value`` set (factory: ``returns``). - INJECT_FD_SEND: ``srcfd``, ``newfd_flags`` set; the supervisor takes ownership of the fd on dispatch. - HOLD: no payload fields used. @@ -68,7 +68,7 @@ def errno(cls, value: int) -> NotifAction: return cls(kind=int(_ActionKind.ERRNO), errno_value=value) @classmethod - def return_value_(cls, value: int) -> NotifAction: + def returns(cls, value: int) -> NotifAction: return cls(kind=int(_ActionKind.RETURN_VALUE), return_value=value) @classmethod @@ -152,6 +152,61 @@ def handle(self, ctx: HandlerCtx) -> NotifAction: ) +# Single-path syscalls — name -> 0-based index of the path argument. +# Keys are syscall NAMES (arch-agnostic); the reverse map below resolves +# them to host-arch numbers via the C ABI's sandlock_syscall_nr. +_PATH_ARG: dict[str, int] = { + "openat": 1, "open": 0, + "unlinkat": 1, "unlink": 0, + "mkdirat": 1, "mkdir": 0, + "rmdir": 0, + "newfstatat": 1, "statx": 1, "stat": 0, "lstat": 0, + "faccessat": 1, "access": 0, + "readlinkat": 1, "readlink": 0, + "execve": 0, "execveat": 1, +} + +# Multi-path syscalls — listed only so we give a helpful error message +# instructing the caller to pass arg= explicitly. +_MULTI_PATH: set[str] = { + "renameat2", "rename", "linkat", "link", "symlinkat", "symlink", +} + +# Invariant: a syscall is either single-path (entry in _PATH_ARG) or +# multi-path (entry in _MULTI_PATH), never both. Checked at module load +# so a future edit that puts a multi-path name into _PATH_ARG fails +# loudly instead of silently letting auto-arg resolve a "primary" path +# for a multi-path syscall. +assert not (set(_PATH_ARG) & _MULTI_PATH), ( + "single-path / multi-path tables must be disjoint; overlap: " + f"{set(_PATH_ARG) & _MULTI_PATH}" +) + +# Lazy, memoised: syscall_nr -> name. Built on first read_path() call by +# resolving every known name via the C ABI's sandlock_syscall_nr. Names +# the host kernel does not know are silently absent (read_path() then +# reports the number as unknown, which is correct). +_REVERSE_PATH_TABLE: dict[int, str] | None = None + + +def _reverse_path_table() -> dict[int, str]: + global _REVERSE_PATH_TABLE + if _REVERSE_PATH_TABLE is None: + from ._sdk import _lib + rev: dict[int, str] = {} + for name in list(_PATH_ARG) + list(_MULTI_PATH): + nr = _lib.sandlock_syscall_nr(name.encode()) + # Names absent on this kernel are intentionally skipped — read_path() + # then reports the number as unknown, which is the spec'd behaviour + # (see PR 3 design doc). Note: a syscall the kernel does not know + # cannot be registered for handler dispatch via sandlock_syscall_nr, + # so a notification with that nr cannot arrive in practice. + if nr >= 0: + rev[nr] = name + _REVERSE_PATH_TABLE = rev + return _REVERSE_PATH_TABLE + + class _MemHandle: """Mutable wrapper around the opaque child-memory handle. @@ -248,3 +303,36 @@ def write(self, addr: int, data: bytes) -> bool: return False from . import _handler_ffi return _handler_ffi.mem_write(cell.ptr, addr, data) + + def read_path(self, arg: int | None = None, max_len: int = 4096) -> str | None: + """Read a path syscall argument from the child as a Python string. + + With ``arg=None`` (default) the path-argument index is inferred + from ``self.syscall_nr`` via a name-keyed table. Multi-path + syscalls (renameat2, rename, linkat, link, symlinkat, symlink) + and syscalls not in the table raise ``ValueError`` — pass + ``arg=`` explicitly in those cases. + + With an explicit ``arg`` (0..5), reads the path from + ``self.args[arg]`` without consulting the table. + + On a live ``HandlerCtx`` returns the decoded string; on a stale + or absent mem handle returns ``None`` (the behaviour of + :meth:`read_cstr`). + """ + if arg is None: + name = _reverse_path_table().get(self.syscall_nr) + if name is None: + raise ValueError( + f"read_path: syscall_nr {self.syscall_nr} is not a known " + f"path syscall — pass arg= explicitly" + ) + if name in _MULTI_PATH: + raise ValueError( + f"read_path: {name} has multiple path args — pass arg= " + f"explicitly (e.g. arg=1 and arg=3 for renameat2)" + ) + arg = _PATH_ARG[name] + if not (0 <= arg < 6): + raise ValueError(f"read_path: arg must be in 0..5, got {arg}") + return self.read_cstr(self.args[arg], max_len) diff --git a/python/src/sandlock/presets.py b/python/src/sandlock/presets.py new file mode 100644 index 0000000..f085897 --- /dev/null +++ b/python/src/sandlock/presets.py @@ -0,0 +1,161 @@ +# SPDX-License-Identifier: Apache-2.0 +"""Preset Handler classes for common interception patterns. + +Imported explicitly: + + from sandlock.presets import AuditPathsHandler, PathDenyHandler, \ + PathAllowHandler, LogSyscallsHandler, COMMON_PATH_SYSCALLS + +The root ``sandlock`` package deliberately does not re-export these — the +root surface stays minimal; callers reach for presets when they want them. +""" + +from __future__ import annotations + +import errno as _errno +import fnmatch +import logging +from typing import Callable + +from .handler import ExceptionPolicy, Handler, HandlerCtx, NotifAction + + +# Modern path-bearing syscalls a generic file-operation handler is typically +# registered against. Used with a list comprehension: +# +# sb.run_with_handlers(cmd, [(s, handler) for s in COMMON_PATH_SYSCALLS]) +COMMON_PATH_SYSCALLS: list[str] = [ + "openat", "unlinkat", "newfstatat", "statx", "faccessat", + "readlinkat", "mkdirat", "execveat", "execve", +] + + +class AuditPathsHandler(Handler): + """Call ``callback(path, ctx)`` on every intercepted path syscall. + + ``on_exception=CONTINUE`` — audit must never block the child. ``path`` + is whatever :meth:`HandlerCtx.read_path` returns for the syscall; the + callback is invoked even when it is ``None`` so the caller sees + "couldn't read". + + ``callback`` may be invoked concurrently on the same handler instance + from different supervisor worker threads — if it mutates shared state, + the caller must provide its own synchronization. + """ + + on_exception = ExceptionPolicy.CONTINUE + + def __init__( + self, + callback: Callable[[str | None, HandlerCtx], None], + max_len: int = 4096, + ) -> None: + self.callback = callback + self.max_len = max_len + + def handle(self, ctx: HandlerCtx) -> NotifAction: + path = ctx.read_path(max_len=self.max_len) + self.callback(path, ctx) + return NotifAction.continue_() + + +class PathDenyHandler(Handler): + """Deny syscalls whose path matches any ``fnmatch`` pattern in ``deny``. + + ``on_exception=KILL`` — security handler, fail-closed if it itself errors. + + The ``path is None`` case is deliberately permissive: a deny-list does + not claim "everything else is allowed", only "these patterns are + denied". When the path cannot be classified we defer to Landlock and + any other handlers in the chain (``continue_()``). + + Patterns are tested in the order given; the first match wins. + """ + + on_exception = ExceptionPolicy.KILL + + def __init__( + self, + deny: list[str], + errno: int = _errno.EPERM, + max_len: int = 4096, + ) -> None: + if not isinstance(deny, list): + raise TypeError( + f"deny must be a list of str patterns, got {type(deny).__name__}" + ) + self.deny = deny + self.errno = errno + self.max_len = max_len + + def handle(self, ctx: HandlerCtx) -> NotifAction: + path = ctx.read_path(max_len=self.max_len) + if path is None: + return NotifAction.continue_() + for pattern in self.deny: + if fnmatch.fnmatchcase(path, pattern): + return NotifAction.errno(self.errno) + return NotifAction.continue_() + + +class PathAllowHandler(Handler): + """Allow only syscalls whose path matches a pattern in ``allow``; deny others. + + ``on_exception=KILL`` — security handler, fail-closed if it itself errors. + + The ``path is None`` case is deliberately restrictive: an allow-list + claims "everything except the listed paths is denied", so failing to + verify the path means failing closed (deny). + + Patterns are tested in the order given; the first match wins. + """ + + on_exception = ExceptionPolicy.KILL + + def __init__( + self, + allow: list[str], + errno: int = _errno.EACCES, + max_len: int = 4096, + ) -> None: + if not isinstance(allow, list): + raise TypeError( + f"allow must be a list of str patterns, got {type(allow).__name__}" + ) + self.allow = allow + self.errno = errno + self.max_len = max_len + + def handle(self, ctx: HandlerCtx) -> NotifAction: + path = ctx.read_path(max_len=self.max_len) + if path is None: + return NotifAction.errno(self.errno) + for pattern in self.allow: + if fnmatch.fnmatchcase(path, pattern): + return NotifAction.continue_() + return NotifAction.errno(self.errno) + + +class LogSyscallsHandler(Handler): + """Log each intercepted syscall as one line; never modify behaviour. + + ``on_exception=CONTINUE`` — observational handler. The default logger + is ``logging.getLogger("sandlock.audit").info``; pass any + ``Callable[[str], None]`` to redirect (e.g. a list's ``append`` for + tests). + + If ``logger`` raises, the exception is absorbed by + ``on_exception=CONTINUE`` — the child proceeds but the log line is + silently lost. + """ + + on_exception = ExceptionPolicy.CONTINUE + + def __init__(self, logger: Callable[[str], None] | None = None) -> None: + self.logger = logger or logging.getLogger("sandlock.audit").info + + def handle(self, ctx: HandlerCtx) -> NotifAction: + self.logger( + f"syscall={ctx.syscall_nr} pid={ctx.pid} args={ctx.args}" + ) + return NotifAction.continue_() diff --git a/python/tests/test_handler_smoke.py b/python/tests/test_handler_smoke.py index e9b3716..97d0edd 100644 --- a/python/tests/test_handler_smoke.py +++ b/python/tests/test_handler_smoke.py @@ -23,7 +23,7 @@ def test_notif_action_kill_carries_sig_and_pgid(): def test_notif_action_return_value_carries_value(): - a = NotifAction.return_value_(42) + a = NotifAction.returns(42) assert a.kind == 3 assert a.return_value == 42 # field, not the classmethod @@ -491,7 +491,7 @@ def handle(self, ctx): def test_handler_return_value_action_overrides_getpid(): - """A handler returning NotifAction.return_value_(777) must make the + """A handler returning NotifAction.returns(777) must make the child's os.getpid() return the synthetic 777 — only reachable if the trampoline translates the ReturnValue action into sandlock_action_set_return_value.""" @@ -507,7 +507,7 @@ class _FakePid(Handler): on_exception = ExceptionPolicy.KILL def handle(self, ctx): - return NotifAction.return_value_(777) + return NotifAction.returns(777) sb = sandlock.Sandbox(fs_readable=_PYTHON_READABLE) result = sb.run_with_handlers( @@ -819,3 +819,346 @@ def handle(self, ctx): cmd=["/bin/true"], handlers=[("definitely_not_a_real_syscall", _Noop())], ) + + +# ---------------------------------------------------------------- +# HandlerCtx.read_path — name-keyed path-arg resolution. +# ---------------------------------------------------------------- + + +def _openat_nr() -> int: + from sandlock._sdk import _lib + return _lib.sandlock_syscall_nr(b"openat") + + +def _renameat2_nr() -> int: + from sandlock._sdk import _lib + return _lib.sandlock_syscall_nr(b"renameat2") + + +def _make_ctx(*, syscall_nr: int, args=(0, 0, 0, 0, 0, 0)): + from sandlock.handler import HandlerCtx + return HandlerCtx( + id=1, pid=1, flags=0, syscall_nr=syscall_nr, arch=0, + instruction_pointer=0, args=args, + ) + + +def test_read_path_auto_arg_resolves_openat_to_args1(monkeypatch): + """For openat, read_path() with no arg= must read the path from args[1].""" + from sandlock.handler import HandlerCtx + captured = [] + monkeypatch.setattr( + HandlerCtx, "read_cstr", + lambda self, addr, max_len: (captured.append((addr, max_len)) or "/tmp/x"), + ) + ctx = _make_ctx(syscall_nr=_openat_nr(), + args=(0, 0xCAFEBABE, 0, 0, 0, 0)) + result = ctx.read_path() + assert result == "/tmp/x" + assert captured == [(0xCAFEBABE, 4096)] + + +def test_read_path_multi_path_syscall_requires_explicit_arg(): + """renameat2 has two path args; read_path() without arg= must raise.""" + if _renameat2_nr() < 0: + pytest.skip("renameat2 not known to host kernel") + ctx = _make_ctx(syscall_nr=_renameat2_nr()) + with pytest.raises(ValueError, match="renameat2"): + ctx.read_path() + + +def test_read_path_unknown_syscall_requires_explicit_arg(): + """A syscall_nr no path-table entry knows about must raise with + instructions to pass arg= explicitly.""" + ctx = _make_ctx(syscall_nr=99999) + with pytest.raises(ValueError, match="pass arg= explicitly"): + ctx.read_path() + + +def test_read_path_rejects_arg_out_of_range(): + ctx = _make_ctx(syscall_nr=_openat_nr()) + for bad in (-1, 6, 99): + with pytest.raises(ValueError, match="arg"): + ctx.read_path(arg=bad) + + +def test_read_path_explicit_arg_works_for_multi_path(monkeypatch): + """An explicit arg= bypasses the multi-path / unknown check.""" + from sandlock.handler import HandlerCtx + monkeypatch.setattr(HandlerCtx, "read_cstr", + lambda self, addr, max_len: "/from") + ctx = _make_ctx(syscall_nr=_renameat2_nr(), + args=(0, 0x1, 0, 0x2, 0, 0)) + assert ctx.read_path(arg=1) == "/from" + + +def test_reverse_path_table_only_contains_known_names(): + """The lazily-built reverse map must contain only names from the two + source tables — locks the contract so a future change cannot leak + other names into auto-arg resolution.""" + from sandlock.handler import ( + _PATH_ARG, _MULTI_PATH, _reverse_path_table, + ) + rev = _reverse_path_table() + known = set(_PATH_ARG) | _MULTI_PATH + for nr, name in rev.items(): + assert name in known, f"reverse map leaked unknown name {name!r} for nr={nr}" + + +# ---------------------------------------------------------------- +# Preset handlers — sandlock.presets. +# ---------------------------------------------------------------- + + +def test_common_path_syscalls_lists_modern_path_syscalls(): + from sandlock.presets import COMMON_PATH_SYSCALLS + assert "openat" in COMMON_PATH_SYSCALLS + assert "execveat" in COMMON_PATH_SYSCALLS + assert len(COMMON_PATH_SYSCALLS) == len(set(COMMON_PATH_SYSCALLS)) + + +def test_audit_paths_handler_default_policy_is_continue(): + from sandlock.presets import AuditPathsHandler + h = AuditPathsHandler(callback=lambda p, c: None) + assert h.on_exception == ExceptionPolicy.CONTINUE + + +def test_audit_paths_handler_calls_callback_and_continues(monkeypatch): + from sandlock.handler import HandlerCtx + from sandlock.presets import AuditPathsHandler + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: "/tmp/probe") + seen = [] + handler = AuditPathsHandler(callback=lambda p, ctx: seen.append((p, ctx.pid))) + action = handler.handle(_make_ctx(syscall_nr=_openat_nr())) + assert action == NotifAction.continue_() + assert seen == [("/tmp/probe", 1)] + + +def test_audit_paths_handler_invokes_callback_even_on_none_path(monkeypatch): + from sandlock.handler import HandlerCtx + from sandlock.presets import AuditPathsHandler + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: None) + seen = [] + handler = AuditPathsHandler(callback=lambda p, c: seen.append(p)) + handler.handle(_make_ctx(syscall_nr=_openat_nr())) + assert seen == [None] + + +def test_presets_not_reexported_from_root(): + """The root sandlock package must stay minimal; presets are imported + explicitly from sandlock.presets.""" + import sandlock + assert not hasattr(sandlock, "AuditPathsHandler") + assert not hasattr(sandlock, "COMMON_PATH_SYSCALLS") + + +def test_path_deny_handler_default_policy_is_kill(): + from sandlock.presets import PathDenyHandler + assert PathDenyHandler(deny=[]).on_exception == ExceptionPolicy.KILL + + +def test_path_deny_handler_rejects_non_list_deny(): + from sandlock.presets import PathDenyHandler + with pytest.raises(TypeError): + PathDenyHandler(deny="/etc/*") # type: ignore[arg-type] + + +def test_path_deny_handler_denies_matching_path(monkeypatch): + from sandlock.handler import HandlerCtx + from sandlock.presets import PathDenyHandler + import errno as _e + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: "/etc/shadow") + handler = PathDenyHandler(deny=["/etc/*"]) + assert handler.handle(_make_ctx(syscall_nr=_openat_nr())) == \ + NotifAction.errno(_e.EPERM) + + +def test_path_deny_handler_passes_non_matching_path(monkeypatch): + from sandlock.handler import HandlerCtx + from sandlock.presets import PathDenyHandler + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: "/tmp/ok") + handler = PathDenyHandler(deny=["/etc/*"]) + assert handler.handle(_make_ctx(syscall_nr=_openat_nr())) == \ + NotifAction.continue_() + + +def test_path_deny_handler_continues_on_none_path(monkeypatch): + """Deny-list contract: cannot classify -> defer (continue), not deny.""" + from sandlock.handler import HandlerCtx + from sandlock.presets import PathDenyHandler + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: None) + handler = PathDenyHandler(deny=["/etc/*"]) + assert handler.handle(_make_ctx(syscall_nr=_openat_nr())) == \ + NotifAction.continue_() + + +def test_path_deny_handler_uses_custom_errno(monkeypatch): + """The errno= constructor parameter is the value returned on a match — + not always EPERM. Pins the public-API surface.""" + from sandlock.handler import HandlerCtx + from sandlock.presets import PathDenyHandler + import errno as _e + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: "/etc/shadow") + handler = PathDenyHandler(deny=["/etc/*"], errno=_e.EACCES) + assert handler.handle(_make_ctx(syscall_nr=_openat_nr())) == \ + NotifAction.errno(_e.EACCES) + + +def test_path_allow_list_handler_default_policy_is_kill(): + from sandlock.presets import PathAllowHandler + assert PathAllowHandler(allow=[]).on_exception == ExceptionPolicy.KILL + + +def test_path_allow_list_handler_rejects_non_list_allow(): + from sandlock.presets import PathAllowHandler + with pytest.raises(TypeError): + PathAllowHandler(allow="/tmp/*") # type: ignore[arg-type] + + +def test_path_allow_list_handler_allows_matching_path(monkeypatch): + from sandlock.handler import HandlerCtx + from sandlock.presets import PathAllowHandler + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: "/tmp/ok") + handler = PathAllowHandler(allow=["/tmp/*"]) + assert handler.handle(_make_ctx(syscall_nr=_openat_nr())) == \ + NotifAction.continue_() + + +def test_path_allow_list_handler_denies_non_matching_path(monkeypatch): + from sandlock.handler import HandlerCtx + from sandlock.presets import PathAllowHandler + import errno as _e + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: "/etc/passwd") + handler = PathAllowHandler(allow=["/tmp/*"]) + assert handler.handle(_make_ctx(syscall_nr=_openat_nr())) == \ + NotifAction.errno(_e.EACCES) + + +def test_path_allow_list_handler_denies_on_none_path(monkeypatch): + """Allow-list contract: cannot verify -> deny (fail-closed).""" + from sandlock.handler import HandlerCtx + from sandlock.presets import PathAllowHandler + import errno as _e + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: None) + handler = PathAllowHandler(allow=["/tmp/*"]) + assert handler.handle(_make_ctx(syscall_nr=_openat_nr())) == \ + NotifAction.errno(_e.EACCES) + + +def test_path_allow_list_handler_uses_custom_errno(monkeypatch): + """The errno= constructor parameter overrides the default on a + non-match. Pins the public-API surface (symmetric with the deny test).""" + from sandlock.handler import HandlerCtx + from sandlock.presets import PathAllowHandler + import errno as _e + monkeypatch.setattr(HandlerCtx, "read_path", + lambda self, max_len=4096: "/etc/passwd") + handler = PathAllowHandler(allow=["/tmp/*"], errno=_e.EPERM) + assert handler.handle(_make_ctx(syscall_nr=_openat_nr())) == \ + NotifAction.errno(_e.EPERM) + + +def test_log_syscalls_handler_default_policy_is_continue(): + from sandlock.presets import LogSyscallsHandler + assert LogSyscallsHandler().on_exception == ExceptionPolicy.CONTINUE + + +def test_log_syscalls_handler_emits_one_line_and_continues(): + from sandlock.presets import LogSyscallsHandler + lines: list[str] = [] + handler = LogSyscallsHandler(logger=lines.append) + ctx = _make_ctx(syscall_nr=_openat_nr(), args=(3, 0xABCD, 0, 0, 0, 0)) + action = handler.handle(ctx) + assert action == NotifAction.continue_() + assert len(lines) == 1 + assert f"syscall={_openat_nr()}" in lines[0] + assert "pid=1" in lines[0] + assert "args=(3, 43981, 0, 0, 0, 0)" in lines[0] + + +def test_audit_paths_handler_e2e_counts_probe_opens(tmp_dir): + """Success criterion: AuditPathsHandler + COMMON_PATH_SYSCALLS observes + the child's opens of a unique probe file the expected number of times. + Exercises the public preset API plus HandlerCtx.read_path() through the + live trampoline.""" + if not os.path.exists(_SYSTEM_PYTHON): + pytest.skip(f"{_SYSTEM_PYTHON} not available") + + from sandlock.presets import AuditPathsHandler, COMMON_PATH_SYSCALLS + + probe = tmp_dir / "preset-probe-file" + probe.write_text("x") + probe_path = str(probe) + + seen: list[str] = [] + seen_all: list[str | None] = [] + + def _cb(path, _ctx): + seen_all.append(path) + if path == probe_path: + seen.append(path) + + audit = AuditPathsHandler(callback=_cb) + + sb = sandlock.Sandbox(fs_readable=[*_PYTHON_READABLE, str(tmp_dir)]) + script = ( + "import os\n" + "for _ in range(3):\n" + " fd = os.open(%r, os.O_RDONLY)\n" + " os.close(fd)\n" % probe_path + ) + result = sb.run_with_handlers( + cmd=[_SYSTEM_PYTHON, "-c", script], + handlers=[(s, audit) for s in COMMON_PATH_SYSCALLS], + ) + + assert result.success, result + assert len(seen) == 3, ( + f"expected 3 opens of probe at {probe_path!r}; got {len(seen)} matched. " + f"Total handler invocations: {len(seen_all)}; " + f"first few paths seen: {seen_all[:10]}" + ) + + +def test_log_syscalls_handler_e2e_observes_openat(tmp_dir): + """LogSyscallsHandler captures one line per intercepted syscall + through the live trampoline. Pin syscall=, pid=, args= structure.""" + if not os.path.exists(_SYSTEM_PYTHON): + pytest.skip(f"{_SYSTEM_PYTHON} not available") + + from sandlock.presets import LogSyscallsHandler + + probe = tmp_dir / "log-syscalls-probe" + probe.write_text("x") + probe_path = str(probe) + + lines: list[str] = [] + handler = LogSyscallsHandler(logger=lines.append) + + sb = sandlock.Sandbox(fs_readable=[*_PYTHON_READABLE, str(tmp_dir)]) + script = ( + "import os\n" + "fd = os.open(%r, os.O_RDONLY)\n" + "os.close(fd)\n" % probe_path + ) + result = sb.run_with_handlers( + cmd=[_SYSTEM_PYTHON, "-c", script], + handlers=[("openat", handler)], + ) + + assert result.success, result + # At least one openat line was captured (the supervisor may also see + # other openat calls from the child's runtime initialization, so + # exact count is not pinned). + assert any("syscall=" in line and "pid=" in line and "args=(" in line + for line in lines), f"no recognizable log line; got {lines[:5]}"