From 7d4f47f1debdd4171a6170841c727776ea7c69fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Weinehall?= Date: Tue, 17 Feb 2026 14:22:09 +0100 Subject: [PATCH] Replace hardcoded CA cert with trust-on-first-use certificate management Replace the static cacert.pem-based certificate validation with a trust-on-first-use (TOFU) model for per-unit certificate trust management. On first contact the user is prompted to accept the server's TLS certificate fingerprint; on subsequent connections it is verified automatically against a local known-hosts store. If a fingerprint mismatch is detected the user is warned and prompted to accept the new fingerprint. - Remove cacert.pem and the hardcoded CA validation adapter - Add TOFU fingerprint verification via _TofuAdapter - Add known_hosts.json-based fingerprint persistence - Update README and module docstring --- README.md | 7 +- python_examples/cacert.pem | 36 -------- python_examples/direkt.py | 164 ++++++++++++++++++++++++++++--------- 3 files changed, 125 insertions(+), 82 deletions(-) delete mode 100644 python_examples/cacert.pem diff --git a/README.md b/README.md index 6f5148c..5fd0868 100644 --- a/README.md +++ b/README.md @@ -26,12 +26,7 @@ Notes: The "direkt" module provides a best-practice connection mode to your Intinor Direkt unit's API. It wraps the "requests" library with some convenient -functionality which includes certificate handling. - -The cacert.pem file is required to be in the same directory as the -direkt.py file. It verifies the default certificate that is installed -on Direkt units. - +functionality which includes certificate trust handling. For more information visit: intinor.com diff --git a/python_examples/cacert.pem b/python_examples/cacert.pem deleted file mode 100644 index fa423c3..0000000 --- a/python_examples/cacert.pem +++ /dev/null @@ -1,36 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIGGTCCBAGgAwIBAgIJAOv6hDrvcDeaMA0GCSqGSIb3DQEBCwUAMGExCzAJBgNV -BAYTAlNFMRMwEQYDVQQKEwpJbnRpbm9yIEFCMRowGAYDVQQDExFJbnRpbm9yIENB -IFNIQTI1NjEhMB8GCSqGSIb3DQEJARYSc3VwcG9ydEBpbnRpbm9yLnNlMB4XDTE2 -MDQwNzEzMTYyOVoXDTI2MDQwNTEzMTYyOVowYTELMAkGA1UEBhMCU0UxEzARBgNV -BAoTCkludGlub3IgQUIxGjAYBgNVBAMTEUludGlub3IgQ0EgU0hBMjU2MSEwHwYJ -KoZIhvcNAQkBFhJzdXBwb3J0QGludGlub3Iuc2UwggIiMA0GCSqGSIb3DQEBAQUA -A4ICDwAwggIKAoICAQDbYYeOzbyKAy07BqxYl3m0Imd9QUaQpN0duCBpUHKC5PMb -KK0vBi0syvW7OJfkwFRsofT2voa9vNjdHyvw8BLONDp5D5Z+2kF3Wd39EvtCks2J -Up/ZSpqu9HwK+jkpOQePxDHHbAEFzSda+n2vXly4dX4iu6qdJqciN2NflqAoCimH -X602pOq8RFvP6oRUE+CDLD9gEkbjEBO88WP2ULEFDCPfa7PuQW0KvX2HPGmFnFEO -i+EqvEbD+caBmuqaA6ZHQF+1y5D9In7yN5BoejNudQHKbV9xzof6cA5lumNQxZ63 -NhQFUQNiqD8TH/LsR1tWwwt9idcIApp3Ox6HRQPTozR/OAK27Cs15O1fyG7nm6/A -Q5ynz1I6JQe/2mm/RCfRK+MK3+9Pz4snXWURdSdFS/9gZhdSjm2YQdjY5NtsZbsV -AWM1aUzW/EwYTWKzUlsfgA6LNENZD+HHT+4alUEtkXVoVf01cwPK1OrRm7G0fYYC -hGnltZ4AyyVqyfcJxJQ8y5llmwEqHqUAaIPyGjBO4gK9eeUdsB3QCerQRi8Dkeoq -NrCuwr4miKoR9ifArbMDUqx5UCDkYX55VaX0WdfcZuSZGAryEGD4Qpm56iyfA45D -HGB1WYufqNkxUcH6Q6mP39XLSEhNfoClYRSs0Z+HKrOXFhUe5p6nzEjURUIqMQID -AQABo4HTMIHQMB0GA1UdDgQWBBTNb6CdfXYWe7eGH0e7QCypsafSaDCBkwYDVR0j -BIGLMIGIgBTNb6CdfXYWe7eGH0e7QCypsafSaKFlpGMwYTELMAkGA1UEBhMCU0Ux -EzARBgNVBAoTCkludGlub3IgQUIxGjAYBgNVBAMTEUludGlub3IgQ0EgU0hBMjU2 -MSEwHwYJKoZIhvcNAQkBFhJzdXBwb3J0QGludGlub3Iuc2WCCQDr+oQ673A3mjAM -BgNVHRMEBTADAQH/MAsGA1UdDwQEAwIBBjANBgkqhkiG9w0BAQsFAAOCAgEAeRZa -ItfnhHAaor0J0Pb16PKHuSHFGM4RjpNtFmIKWAxqz7pjFLxyuqX9NfnyN+XAs+20 -8xYfmjJyQVk+efhK3znkm8zkURtbJoEUPefOqjMuNBk4SgV8tRQtfsETOFSOkMbf -KWxUlwBO+jLmcwcMZ/TfHwBICI6UWRd77lAvawlqeiJatmh+MLq7Y2p0NU4l4kTQ -UqvZhkKh1Lgr5JXCiN5AlyID5obiXUKe18DEIc3re/NvU8/sZZpi9SZX8QpHHtGs -IHrplAIIFPO/dcQy2cjGTo2JS3puLOGcfyjrkEVKybbG3/ZsQ7MZXdw//xwyrss7 -ogFHkrgopgpcMper2UiPdZQp+yMp/UNNAdQZkEcOv1G9Dtt3RcXDHCvjGKdIsd9Y -lNGnH07xMb94Jpr/6A25EvvkX9Wl/yRzSIvn9jsbclfLOsCT4sOVOMtn/dfnM+By -EslV5SmsP61S2tPm9g/zmY3fLY29ikC6Vf2sBI0k5JJ2Y6NgZpyxLBj3q9wrFfw1 -ihaWUZW7EoV/iK7CwtTAqevj4q0jtEotpx2nQXJdPfeXrkmx7XDa0RcuxJzJh85M -vU8XA8tyvJYyemTCXpDCzkq7iusGT3ySlb0lvH8ohIp9skg0FYykk5l7bwy6w8c/ -Bwdofvam9KripJPzMFF/x/zuGVHAzSCjgtbAKx4= ------END CERTIFICATE----- - diff --git a/python_examples/direkt.py b/python_examples/direkt.py index 9a19228..e42b45f 100644 --- a/python_examples/direkt.py +++ b/python_examples/direkt.py @@ -1,27 +1,96 @@ -"""The "direkt" module provides a best-practice connection mode to your Intinor -Direkt unit's API. The first connection attempt for each request will be done -with the "requests" library requiring a valid certificate. If this does not -succeed the second connection attempt will be done without strict hostname -checking using an Intinor issued HTTPS certificate. - -While we recommend using the "direkt" module, alternatives are available. An -alternative is connecting to Direkt units through ISS explicitly using the -"requests" library. If you wish instead to directly connect to your unit -without using the "direkt" module (and not connect through ISS), e.g. under use -of a third-party certificate, please contact Intinor support. - -Contact Intinor support for more information on Direkt unit usage and how to -secure your API infrastructure. +"""The "direkt" module provides a best-practice connection mode to +your Intinor Direkt unit's API. The first connection attempt for each +request will be done with the "requests" library requiring a valid +certificate. If this does not succeed the second attempt uses a +trust-on-first-use (TOFU) model: the server's TLS certificate +fingerprint is compared against a local known-hosts store. On first +contact the user is prompted to accept the fingerprint; on subsequent +connections it is verified automatically. If a fingerprint mismatch is +detected the user is prompted to accept the new fingerprint. + +While we recommend using the "direkt" module or something similar, +alternatives are available. An alternative is connecting to Direkt +units through ISS explicitly using the "requests" library. If you wish +instead to directly connect to your unit without using the "direkt" +module (and not connect through ISS), e.g. under use of a third-party +certificate, please contact Intinor support. + +Contact Intinor support for more information on Direkt unit usage and +how to secure your API infrastructure. """ +import hashlib +import json import os +import ssl +import warnings import requests +import urllib3 +from urllib3.poolmanager import PoolManager + +# Path to the known-hosts fingerprint store (next to this file). +_KNOWN_HOSTS_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), + 'known_hosts.json') + + +def _load_known_hosts(): + """Load the known-hosts fingerprint store from disk.""" + if os.path.isfile(_KNOWN_HOSTS_PATH): + with open(_KNOWN_HOSTS_PATH, 'r') as f: + return json.load(f) + return {} + + +def _save_known_hosts(known_hosts): + """Persist the known-hosts fingerprint store to disk.""" + with open(_KNOWN_HOSTS_PATH, 'w') as f: + json.dump(known_hosts, f, indent=2, sort_keys=True) + + +def _fingerprint_from_der(der): + """Return the SHA-256 hex digest of a DER-encoded certificate.""" + return hashlib.sha256(der).hexdigest() + + +def _verify_tofu_fingerprint(fingerprint, host, port): + """Check *fingerprint* against the known-hosts store for *host*:*port*. + + * First contact – prompt the user to accept the fingerprint. + * Known & matching – pass silently. + * Known & mismatched – prompt the user to accept the new fingerprint. + + Raises ``RuntimeError`` if the user rejects the fingerprint. + """ + known_hosts = _load_known_hosts() + key = f"{host}:{port}" + + if key not in known_hosts: + print(f"\nUnknown host '{host}:{port}'.") + print(f" SHA-256 fingerprint: {fingerprint}") + answer = input("Trust this certificate? [y/N] ").strip().lower() + if answer != 'y': + raise RuntimeError( + f"Certificate for '{host}:{port}' rejected by user.") + known_hosts[key] = fingerprint + _save_known_hosts(known_hosts) + + elif known_hosts[key] != fingerprint: + print(f"\n*** WARNING: Certificate fingerprint for '{host}:{port}' " + "has CHANGED! ***") + print(f" Previously trusted: {known_hosts[key]}") + print(f" Current: {fingerprint}") + answer = input("Accept the new certificate? [y/N] ").strip().lower() + if answer != 'y': + raise RuntimeError( + f"Certificate change for '{host}:{port}' rejected by user.") + known_hosts[key] = fingerprint + _save_known_hosts(known_hosts) def request(method, url, **kwargs): """Sends a request trying default certificate validation and if that does - not succeed a retry is made with a custom certificate handler that - validates against a factory default custom Intinor CA signed certificate. + not succeed a retry is made using trust-on-first-use fingerprint + verification. """ with requests.Session() as session: @@ -30,9 +99,9 @@ def request(method, url, **kwargs): return session.request(method=method, url=url, **kwargs) except requests.exceptions.SSLError: - # Retry using a factory default custom Intinor CA signed - # certificate. - session.mount('https://', _DirektCheckingAdapter()) + # Fallback: TOFU fingerprint verification performed + # inline by the adapter during the actual TLS handshake. + session.mount('https://', _TofuAdapter()) return session.request(method=method, url=url, **kwargs) @@ -79,26 +148,41 @@ def delete(url, **kwargs): return request('delete', url, **kwargs) -class _DirektCheckingAdapter(requests.adapters.HTTPAdapter): - """Custom hostname / CA checking adapter for direct access to a Direkt - unit's API +class _TofuAdapter(requests.adapters.HTTPAdapter): + """HTTPS adapter that verifies the server certificate fingerprint using + trust-on-first-use during the actual request connection. """ - def __init__(self): - super().__init__() - this_path = os.path.abspath(__file__) - - # The cacert.pem file is required to be in the same directory as the - # direkt.py file. It verifies the default certificate that is installed - # on Direkt units. - self.intinor_ca = os.path.dirname(this_path) + '/cacert.pem' - - def cert_verify(self, conn, url, verify, cert): - """If your Direkt unit does not have a valid DNS name or HTTPS - certificate we offer an alternative certificate validation method using - an Intinor issued HTTPS certificate without strict hostname checking. - """ - - verify = self.intinor_ca - conn.assert_hostname = False - return super().cert_verify(conn, url, verify, cert) + def init_poolmanager(self, *args, **kwargs): + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + self.poolmanager = PoolManager(*args, ssl_context=ctx, **kwargs) + + def send(self, request, stream=False, timeout=None, + verify=True, cert=None, proxies=None): + # Disable requests-level verification; we verify the fingerprint + # ourselves after the TLS handshake completes. Suppress the + # InsecureRequestWarning that urllib3 emits for verify=False since + # the TOFU adapter performs its own certificate verification. + with warnings.catch_warnings(): + warnings.filterwarnings( + 'ignore', + category=urllib3.exceptions.InsecureRequestWarning) + response = super().send(request, stream=stream, timeout=timeout, + verify=False, cert=cert, proxies=proxies) + + # Extract the peer certificate from the underlying socket. + sock = response.raw._connection.sock + der = sock.getpeercert(binary_form=True) + fingerprint = _fingerprint_from_der(der) + + from urllib.parse import urlparse + parsed = urlparse(response.url) + host = parsed.hostname + port = parsed.port or 443 + + # This will prompt on first use or on change, and raise on rejection. + _verify_tofu_fingerprint(fingerprint, host, port) + + return response