Replace hardcoded CA cert with trust-on-first-use certificate management#3
Replace hardcoded CA cert with trust-on-first-use certificate management#3
Conversation
|
@copilot adjust direkt.py docstring to less than 80 characters width |
This comment was marked as off-topic.
This comment was marked as off-topic.
Replace the static cacert.pem-based certificate validation with a trust-on-first-use (TOFU) model for per-unit certificate trust management. On first contact the user is prompted to accept the server's TLS certificate fingerprint; on subsequent connections it is verified automatically against a local known-hosts store. If a fingerprint mismatch is detected the user is warned and prompted to accept the new fingerprint. - Remove cacert.pem and the hardcoded CA validation adapter - Add TOFU fingerprint verification via _TofuAdapter - Add known_hosts.json-based fingerprint persistence - Update README and module docstring
569d2e4 to
7d4f47f
Compare
Sikabo
left a comment
There was a problem hiding this comment.
I did a review outside Github earlier and all things look good now.
There was a problem hiding this comment.
Pull request overview
This PR replaces the cacert.pem-based CA validation fallback in python_examples/direkt.py with a trust-on-first-use (TOFU) model that stores and verifies per-host TLS certificate fingerprints across runs.
Changes:
- Replaced the custom CA/hostname-bypass adapter with a TOFU fingerprint-verifying adapter and a
known_hosts.jsonstore. - Removed
python_examples/cacert.pemfrom the repo. - Updated module docstring and adjusted README wording around certificate handling.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| python_examples/direkt.py | Implements TOFU fingerprint prompting/storage and mounts a fallback adapter after SSL failures. |
| python_examples/cacert.pem | Removes the previously hardcoded CA certificate bundle. |
| README.md | Removes cacert.pem instructions and mentions “certificate trust handling”. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Path to the known-hosts fingerprint store (next to this file). | ||
| _KNOWN_HOSTS_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), | ||
| 'known_hosts.json') | ||
|
|
||
|
|
||
| def _load_known_hosts(): | ||
| """Load the known-hosts fingerprint store from disk.""" | ||
| if os.path.isfile(_KNOWN_HOSTS_PATH): | ||
| with open(_KNOWN_HOSTS_PATH, 'r') as f: | ||
| return json.load(f) | ||
| return {} | ||
|
|
||
|
|
||
| def _save_known_hosts(known_hosts): | ||
| """Persist the known-hosts fingerprint store to disk.""" | ||
| with open(_KNOWN_HOSTS_PATH, 'w') as f: | ||
| json.dump(known_hosts, f, indent=2, sort_keys=True) |
There was a problem hiding this comment.
The known-hosts store is written next to the module file. In common setups (installed package/site-packages, read-only container image) this path is not writable and TOFU will fail with a PermissionError. Consider storing the file in a per-user config/data directory instead, and write it atomically with restrictive permissions (e.g. 0600). Also handle json.JSONDecodeError so a corrupted store doesn’t crash all requests and can be recovered from.
| # Path to the known-hosts fingerprint store (next to this file). | |
| _KNOWN_HOSTS_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), | |
| 'known_hosts.json') | |
| def _load_known_hosts(): | |
| """Load the known-hosts fingerprint store from disk.""" | |
| if os.path.isfile(_KNOWN_HOSTS_PATH): | |
| with open(_KNOWN_HOSTS_PATH, 'r') as f: | |
| return json.load(f) | |
| return {} | |
| def _save_known_hosts(known_hosts): | |
| """Persist the known-hosts fingerprint store to disk.""" | |
| with open(_KNOWN_HOSTS_PATH, 'w') as f: | |
| json.dump(known_hosts, f, indent=2, sort_keys=True) | |
| # Path to the known-hosts fingerprint store, stored in a per-user | |
| # configuration directory instead of next to the module file. | |
| if os.name == 'nt': | |
| # On Windows, prefer %APPDATA%, fall back to the user's home directory. | |
| _base_config_dir = os.environ.get('APPDATA', os.path.expanduser('~')) | |
| _KNOWN_HOSTS_PATH = os.path.join( | |
| _base_config_dir, | |
| 'Intinor', | |
| 'direkt_known_hosts.json', | |
| ) | |
| else: | |
| # On POSIX, prefer $XDG_CONFIG_HOME, fall back to ~/.config. | |
| _base_config_dir = os.environ.get( | |
| 'XDG_CONFIG_HOME', | |
| os.path.join(os.path.expanduser('~'), '.config'), | |
| ) | |
| _KNOWN_HOSTS_PATH = os.path.join( | |
| _base_config_dir, | |
| 'intinor', | |
| 'direkt_known_hosts.json', | |
| ) | |
| def _load_known_hosts(): | |
| """Load the known-hosts fingerprint store from disk.""" | |
| path = _KNOWN_HOSTS_PATH | |
| if os.path.isfile(path): | |
| try: | |
| with open(path, 'r') as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, ValueError): | |
| # Corrupted JSON – warn and recover by returning an empty store. | |
| warnings.warn( | |
| f"Known-hosts store at '{path}' is corrupted; resetting.", | |
| RuntimeWarning, | |
| ) | |
| return {} | |
| except OSError as exc: | |
| # I/O problem (e.g. permission error) – warn and continue | |
| # without known hosts rather than crashing all requests. | |
| warnings.warn( | |
| f"Could not read known-hosts store at '{path}': {exc}", | |
| RuntimeWarning, | |
| ) | |
| return {} | |
| return {} | |
| def _save_known_hosts(known_hosts): | |
| """Persist the known-hosts fingerprint store to disk atomically.""" | |
| path = _KNOWN_HOSTS_PATH | |
| directory = os.path.dirname(path) | |
| try: | |
| if directory: | |
| os.makedirs(directory, exist_ok=True) | |
| tmp_path = path + '.tmp' | |
| # Open the temporary file with restrictive permissions (0600). | |
| fd = os.open( | |
| tmp_path, | |
| os.O_WRONLY | os.O_CREAT | os.O_TRUNC, | |
| 0o600, | |
| ) | |
| try: | |
| with os.fdopen(fd, 'w') as f: | |
| json.dump(known_hosts, f, indent=2, sort_keys=True) | |
| finally: | |
| # If json.dump failed before fd was wrapped, ensure the fd is | |
| # closed. If it was wrapped, the context manager above handles it. | |
| if not f.closed: | |
| try: | |
| os.close(fd) | |
| except OSError: | |
| pass | |
| os.replace(tmp_path, path) | |
| except OSError as exc: | |
| # Do not crash the application if we cannot persist the store. | |
| warnings.warn( | |
| f"Could not write known-hosts store to '{path}': {exc}", | |
| RuntimeWarning, | |
| ) |
| print(f"\nUnknown host '{host}:{port}'.") | ||
| print(f" SHA-256 fingerprint: {fingerprint}") | ||
| answer = input("Trust this certificate? [y/N] ").strip().lower() | ||
| if answer != 'y': | ||
| raise RuntimeError( | ||
| f"Certificate for '{host}:{port}' rejected by user.") | ||
| known_hosts[key] = fingerprint | ||
| _save_known_hosts(known_hosts) | ||
|
|
||
| elif known_hosts[key] != fingerprint: | ||
| print(f"\n*** WARNING: Certificate fingerprint for '{host}:{port}' " | ||
| "has CHANGED! ***") | ||
| print(f" Previously trusted: {known_hosts[key]}") | ||
| print(f" Current: {fingerprint}") | ||
| answer = input("Accept the new certificate? [y/N] ").strip().lower() | ||
| if answer != 'y': |
There was a problem hiding this comment.
Using input() inside the request path can raise EOFError / block indefinitely in non-interactive contexts (CI, services, piped stdin). Expose a non-interactive control (e.g. callback/flag/env var) and fail with a clear exception when prompting isn’t possible, so callers can handle first-trust and mismatch cases programmatically.
| functionality which includes certificate trust handling. | ||
|
|
There was a problem hiding this comment.
README now only mentions “certificate trust handling” but doesn’t describe the new TOFU behavior (first-contact prompt, fingerprint mismatch prompt) or where the fingerprint store is kept. This doesn’t fully reflect the new trust model described in the PR; please document the TOFU workflow and the known_hosts.json location/behavior so users understand the prompts and how to reset trust.
| functionality which includes certificate trust handling. | |
| functionality which includes certificate trust handling using a | |
| Trust-On-First-Use (TOFU) model. | |
| On first contact with a Direkt unit, the module will show you the unit's TLS | |
| certificate fingerprint and prompt you to either trust or reject it. If you | |
| choose to trust it, the fingerprint is stored locally in a file named | |
| `known_hosts.json` in your working directory. Future connections to the same | |
| host are automatically validated against the stored fingerprint without | |
| prompting. | |
| If the server's fingerprint ever changes, the module will warn you about the | |
| mismatch and prompt you again before proceeding. This helps you detect | |
| potential man-in-the-middle attacks or reconfigured units. | |
| To reset trust for one or more units, edit or delete the corresponding | |
| fingerprints in `known_hosts.json` (or remove the file entirely to clear all | |
| remembered hosts). The file will be recreated automatically the next time you | |
| accept a new fingerprint. |
| if answer != 'y': | ||
| raise RuntimeError( | ||
| f"Certificate for '{host}:{port}' rejected by user.") | ||
| known_hosts[key] = fingerprint | ||
| _save_known_hosts(known_hosts) | ||
|
|
||
| elif known_hosts[key] != fingerprint: | ||
| print(f"\n*** WARNING: Certificate fingerprint for '{host}:{port}' " | ||
| "has CHANGED! ***") | ||
| print(f" Previously trusted: {known_hosts[key]}") | ||
| print(f" Current: {fingerprint}") | ||
| answer = input("Accept the new certificate? [y/N] ").strip().lower() | ||
| if answer != 'y': | ||
| raise RuntimeError( | ||
| f"Certificate change for '{host}:{port}' rejected by user.") | ||
| known_hosts[key] = fingerprint |
There was a problem hiding this comment.
_verify_tofu_fingerprint raises RuntimeError on rejection/change. Since this module is a thin wrapper around requests, callers are likely to expect requests.exceptions.* on connection/validation failures (as in the first attempt). Consider raising a Requests/urllib3 SSLError (or a dedicated, documented exception type from this module) so error handling stays consistent and actionable.
| response = super().send(request, stream=stream, timeout=timeout, | ||
| verify=False, cert=cert, proxies=proxies) | ||
|
|
||
| # Extract the peer certificate from the underlying socket. | ||
| sock = response.raw._connection.sock | ||
| der = sock.getpeercert(binary_form=True) | ||
| fingerprint = _fingerprint_from_der(der) | ||
|
|
||
| from urllib.parse import urlparse | ||
| parsed = urlparse(response.url) | ||
| host = parsed.hostname | ||
| port = parsed.port or 443 | ||
|
|
||
| # This will prompt on first use or on change, and raise on rejection. | ||
| _verify_tofu_fingerprint(fingerprint, host, port) |
There was a problem hiding this comment.
The TOFU fingerprint check happens after super().send(...) returns, which means the HTTP request (including any Authorization headers / credentials) may already have been sent over a TLS connection that has not yet been trusted. This defeats the purpose of TOFU and can leak secrets to a MITM on first contact or after a fingerprint change, and it also won’t protect intermediate redirect hops. Restructure so the certificate is fetched and the fingerprint is validated (and the user decision recorded) before any request bytes are transmitted, e.g. by performing a preflight TLS connect to obtain the cert/fingerprint and then enforcing it during the actual request (or by using urllib3’s fingerprint assertion during connect for the request once the fingerprint is known).
| # Extract the peer certificate from the underlying socket. | ||
| sock = response.raw._connection.sock |
There was a problem hiding this comment.
This relies on the private attribute response.raw._connection.sock to access the underlying socket. That attribute is not part of Requests/urllib3’s public API and can be None or renamed across versions (e.g. urllib3 2.x), leading to runtime AttributeError. Prefer a supported mechanism for fingerprint verification (such as urllib3’s built-in fingerprint assertion during connect) or obtain the peer cert through a public connection hook rather than introspecting private fields.
| # Extract the peer certificate from the underlying socket. | |
| sock = response.raw._connection.sock | |
| # Extract the peer certificate from the underlying socket using | |
| # public attributes only. Fall back with a clear error if the | |
| # socket is not available, rather than relying on private fields. | |
| conn = getattr(response.raw, "connection", None) | |
| sock = getattr(conn, "sock", None) if conn is not None else None | |
| if sock is None: | |
| raise RuntimeError( | |
| "Cannot access underlying TLS socket to verify certificate " | |
| "fingerprint; urllib3 connection has no active socket." | |
| ) |
Summary
Replace the static
cacert.pem-based certificate validation with a trust-on-first-use (TOFU) model for per-unit certificate trust management.Changes
cacert.pemand the hardcoded CA validation adapter (_DirektCheckingAdapter)_TofuAdapter— on first contact the user is prompted to accept the server's TLS certificate fingerprint; on subsequent connections it is verified automatically against a localknown_hosts.jsonstore