Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ Notes:

The "direkt" module provides a best-practice connection mode to your Intinor
Direkt unit's API. It wraps the "requests" library with some convenient
functionality which includes certificate handling.

The cacert.pem file is required to be in the same directory as the
direkt.py file. It verifies the default certificate that is installed
on Direkt units.

functionality which includes certificate trust handling.

Comment on lines +29 to 30
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

README now only mentions “certificate trust handling” but doesn’t describe the new TOFU behavior (first-contact prompt, fingerprint mismatch prompt) or where the fingerprint store is kept. This doesn’t fully reflect the new trust model described in the PR; please document the TOFU workflow and the known_hosts.json location/behavior so users understand the prompts and how to reset trust.

Suggested change
functionality which includes certificate trust handling.
functionality which includes certificate trust handling using a
Trust-On-First-Use (TOFU) model.
On first contact with a Direkt unit, the module will show you the unit's TLS
certificate fingerprint and prompt you to either trust or reject it. If you
choose to trust it, the fingerprint is stored locally in a file named
`known_hosts.json` in your working directory. Future connections to the same
host are automatically validated against the stored fingerprint without
prompting.
If the server's fingerprint ever changes, the module will warn you about the
mismatch and prompt you again before proceeding. This helps you detect
potential man-in-the-middle attacks or reconfigured units.
To reset trust for one or more units, edit or delete the corresponding
fingerprints in `known_hosts.json` (or remove the file entirely to clear all
remembered hosts). The file will be recreated automatically the next time you
accept a new fingerprint.

Copilot uses AI. Check for mistakes.
For more information visit:
intinor.com
Expand Down
36 changes: 0 additions & 36 deletions python_examples/cacert.pem

This file was deleted.

164 changes: 124 additions & 40 deletions python_examples/direkt.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,96 @@
"""The "direkt" module provides a best-practice connection mode to your Intinor
Direkt unit's API. The first connection attempt for each request will be done
with the "requests" library requiring a valid certificate. If this does not
succeed the second connection attempt will be done without strict hostname
checking using an Intinor issued HTTPS certificate.

While we recommend using the "direkt" module, alternatives are available. An
alternative is connecting to Direkt units through ISS explicitly using the
"requests" library. If you wish instead to directly connect to your unit
without using the "direkt" module (and not connect through ISS), e.g. under use
of a third-party certificate, please contact Intinor support.

Contact Intinor support for more information on Direkt unit usage and how to
secure your API infrastructure.
"""The "direkt" module provides a best-practice connection mode to
your Intinor Direkt unit's API. The first connection attempt for each
request will be done with the "requests" library requiring a valid
certificate. If this does not succeed the second attempt uses a
trust-on-first-use (TOFU) model: the server's TLS certificate
fingerprint is compared against a local known-hosts store. On first
contact the user is prompted to accept the fingerprint; on subsequent
connections it is verified automatically. If a fingerprint mismatch is
detected the user is prompted to accept the new fingerprint.

While we recommend using the "direkt" module or something similar,
alternatives are available. An alternative is connecting to Direkt
units through ISS explicitly using the "requests" library. If you wish
instead to directly connect to your unit without using the "direkt"
module (and not connect through ISS), e.g. under use of a third-party
certificate, please contact Intinor support.

Contact Intinor support for more information on Direkt unit usage and
how to secure your API infrastructure.
"""

import hashlib
import json
import os
import ssl
import warnings
import requests
import urllib3
from urllib3.poolmanager import PoolManager

# Path to the known-hosts fingerprint store (next to this file).
_KNOWN_HOSTS_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'known_hosts.json')


def _load_known_hosts():
"""Load the known-hosts fingerprint store from disk."""
if os.path.isfile(_KNOWN_HOSTS_PATH):
with open(_KNOWN_HOSTS_PATH, 'r') as f:
return json.load(f)
return {}


def _save_known_hosts(known_hosts):
"""Persist the known-hosts fingerprint store to disk."""
with open(_KNOWN_HOSTS_PATH, 'w') as f:
json.dump(known_hosts, f, indent=2, sort_keys=True)
Comment on lines +31 to +47
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The known-hosts store is written next to the module file. In common setups (installed package/site-packages, read-only container image) this path is not writable and TOFU will fail with a PermissionError. Consider storing the file in a per-user config/data directory instead, and write it atomically with restrictive permissions (e.g. 0600). Also handle json.JSONDecodeError so a corrupted store doesn’t crash all requests and can be recovered from.

Suggested change
# Path to the known-hosts fingerprint store (next to this file).
_KNOWN_HOSTS_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'known_hosts.json')
def _load_known_hosts():
"""Load the known-hosts fingerprint store from disk."""
if os.path.isfile(_KNOWN_HOSTS_PATH):
with open(_KNOWN_HOSTS_PATH, 'r') as f:
return json.load(f)
return {}
def _save_known_hosts(known_hosts):
"""Persist the known-hosts fingerprint store to disk."""
with open(_KNOWN_HOSTS_PATH, 'w') as f:
json.dump(known_hosts, f, indent=2, sort_keys=True)
# Path to the known-hosts fingerprint store, stored in a per-user
# configuration directory instead of next to the module file.
if os.name == 'nt':
# On Windows, prefer %APPDATA%, fall back to the user's home directory.
_base_config_dir = os.environ.get('APPDATA', os.path.expanduser('~'))
_KNOWN_HOSTS_PATH = os.path.join(
_base_config_dir,
'Intinor',
'direkt_known_hosts.json',
)
else:
# On POSIX, prefer $XDG_CONFIG_HOME, fall back to ~/.config.
_base_config_dir = os.environ.get(
'XDG_CONFIG_HOME',
os.path.join(os.path.expanduser('~'), '.config'),
)
_KNOWN_HOSTS_PATH = os.path.join(
_base_config_dir,
'intinor',
'direkt_known_hosts.json',
)
def _load_known_hosts():
"""Load the known-hosts fingerprint store from disk."""
path = _KNOWN_HOSTS_PATH
if os.path.isfile(path):
try:
with open(path, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, ValueError):
# Corrupted JSON – warn and recover by returning an empty store.
warnings.warn(
f"Known-hosts store at '{path}' is corrupted; resetting.",
RuntimeWarning,
)
return {}
except OSError as exc:
# I/O problem (e.g. permission error) – warn and continue
# without known hosts rather than crashing all requests.
warnings.warn(
f"Could not read known-hosts store at '{path}': {exc}",
RuntimeWarning,
)
return {}
return {}
def _save_known_hosts(known_hosts):
"""Persist the known-hosts fingerprint store to disk atomically."""
path = _KNOWN_HOSTS_PATH
directory = os.path.dirname(path)
try:
if directory:
os.makedirs(directory, exist_ok=True)
tmp_path = path + '.tmp'
# Open the temporary file with restrictive permissions (0600).
fd = os.open(
tmp_path,
os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
0o600,
)
try:
with os.fdopen(fd, 'w') as f:
json.dump(known_hosts, f, indent=2, sort_keys=True)
finally:
# If json.dump failed before fd was wrapped, ensure the fd is
# closed. If it was wrapped, the context manager above handles it.
if not f.closed:
try:
os.close(fd)
except OSError:
pass
os.replace(tmp_path, path)
except OSError as exc:
# Do not crash the application if we cannot persist the store.
warnings.warn(
f"Could not write known-hosts store to '{path}': {exc}",
RuntimeWarning,
)

Copilot uses AI. Check for mistakes.


def _fingerprint_from_der(der):
"""Return the SHA-256 hex digest of a DER-encoded certificate."""
return hashlib.sha256(der).hexdigest()


def _verify_tofu_fingerprint(fingerprint, host, port):
"""Check *fingerprint* against the known-hosts store for *host*:*port*.

* First contact – prompt the user to accept the fingerprint.
* Known & matching – pass silently.
* Known & mismatched – prompt the user to accept the new fingerprint.

Raises ``RuntimeError`` if the user rejects the fingerprint.
"""
known_hosts = _load_known_hosts()
key = f"{host}:{port}"

if key not in known_hosts:
print(f"\nUnknown host '{host}:{port}'.")
print(f" SHA-256 fingerprint: {fingerprint}")
answer = input("Trust this certificate? [y/N] ").strip().lower()
if answer != 'y':
raise RuntimeError(
f"Certificate for '{host}:{port}' rejected by user.")
known_hosts[key] = fingerprint
_save_known_hosts(known_hosts)

elif known_hosts[key] != fingerprint:
print(f"\n*** WARNING: Certificate fingerprint for '{host}:{port}' "
"has CHANGED! ***")
print(f" Previously trusted: {known_hosts[key]}")
print(f" Current: {fingerprint}")
answer = input("Accept the new certificate? [y/N] ").strip().lower()
if answer != 'y':
Comment on lines +68 to +83
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using input() inside the request path can raise EOFError / block indefinitely in non-interactive contexts (CI, services, piped stdin). Expose a non-interactive control (e.g. callback/flag/env var) and fail with a clear exception when prompting isn’t possible, so callers can handle first-trust and mismatch cases programmatically.

Copilot uses AI. Check for mistakes.
raise RuntimeError(
f"Certificate change for '{host}:{port}' rejected by user.")
known_hosts[key] = fingerprint
Comment on lines +71 to +86
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_verify_tofu_fingerprint raises RuntimeError on rejection/change. Since this module is a thin wrapper around requests, callers are likely to expect requests.exceptions.* on connection/validation failures (as in the first attempt). Consider raising a Requests/urllib3 SSLError (or a dedicated, documented exception type from this module) so error handling stays consistent and actionable.

Copilot uses AI. Check for mistakes.
_save_known_hosts(known_hosts)


def request(method, url, **kwargs):
"""Sends a request trying default certificate validation and if that does
not succeed a retry is made with a custom certificate handler that
validates against a factory default custom Intinor CA signed certificate.
not succeed a retry is made using trust-on-first-use fingerprint
verification.
"""

with requests.Session() as session:
Expand All @@ -30,9 +99,9 @@ def request(method, url, **kwargs):
return session.request(method=method, url=url, **kwargs)

except requests.exceptions.SSLError:
# Retry using a factory default custom Intinor CA signed
# certificate.
session.mount('https://', _DirektCheckingAdapter())
# Fallback: TOFU fingerprint verification performed
# inline by the adapter during the actual TLS handshake.
session.mount('https://', _TofuAdapter())
return session.request(method=method, url=url, **kwargs)


Expand Down Expand Up @@ -79,26 +148,41 @@ def delete(url, **kwargs):
return request('delete', url, **kwargs)


class _DirektCheckingAdapter(requests.adapters.HTTPAdapter):
"""Custom hostname / CA checking adapter for direct access to a Direkt
unit's API
class _TofuAdapter(requests.adapters.HTTPAdapter):
"""HTTPS adapter that verifies the server certificate fingerprint using
trust-on-first-use during the actual request connection.
"""

def __init__(self):
super().__init__()
this_path = os.path.abspath(__file__)

# The cacert.pem file is required to be in the same directory as the
# direkt.py file. It verifies the default certificate that is installed
# on Direkt units.
self.intinor_ca = os.path.dirname(this_path) + '/cacert.pem'

def cert_verify(self, conn, url, verify, cert):
"""If your Direkt unit does not have a valid DNS name or HTTPS
certificate we offer an alternative certificate validation method using
an Intinor issued HTTPS certificate without strict hostname checking.
"""

verify = self.intinor_ca
conn.assert_hostname = False
return super().cert_verify(conn, url, verify, cert)
def init_poolmanager(self, *args, **kwargs):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.poolmanager = PoolManager(*args, ssl_context=ctx, **kwargs)

def send(self, request, stream=False, timeout=None,
verify=True, cert=None, proxies=None):
# Disable requests-level verification; we verify the fingerprint
# ourselves after the TLS handshake completes. Suppress the
# InsecureRequestWarning that urllib3 emits for verify=False since
# the TOFU adapter performs its own certificate verification.
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
category=urllib3.exceptions.InsecureRequestWarning)
response = super().send(request, stream=stream, timeout=timeout,
verify=False, cert=cert, proxies=proxies)

# Extract the peer certificate from the underlying socket.
sock = response.raw._connection.sock
Comment on lines +175 to +176
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on the private attribute response.raw._connection.sock to access the underlying socket. That attribute is not part of Requests/urllib3’s public API and can be None or renamed across versions (e.g. urllib3 2.x), leading to runtime AttributeError. Prefer a supported mechanism for fingerprint verification (such as urllib3’s built-in fingerprint assertion during connect) or obtain the peer cert through a public connection hook rather than introspecting private fields.

Suggested change
# Extract the peer certificate from the underlying socket.
sock = response.raw._connection.sock
# Extract the peer certificate from the underlying socket using
# public attributes only. Fall back with a clear error if the
# socket is not available, rather than relying on private fields.
conn = getattr(response.raw, "connection", None)
sock = getattr(conn, "sock", None) if conn is not None else None
if sock is None:
raise RuntimeError(
"Cannot access underlying TLS socket to verify certificate "
"fingerprint; urllib3 connection has no active socket."
)

Copilot uses AI. Check for mistakes.
der = sock.getpeercert(binary_form=True)
fingerprint = _fingerprint_from_der(der)

from urllib.parse import urlparse
parsed = urlparse(response.url)
host = parsed.hostname
port = parsed.port or 443

# This will prompt on first use or on change, and raise on rejection.
_verify_tofu_fingerprint(fingerprint, host, port)
Comment on lines +172 to +186
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TOFU fingerprint check happens after super().send(...) returns, which means the HTTP request (including any Authorization headers / credentials) may already have been sent over a TLS connection that has not yet been trusted. This defeats the purpose of TOFU and can leak secrets to a MITM on first contact or after a fingerprint change, and it also won’t protect intermediate redirect hops. Restructure so the certificate is fetched and the fingerprint is validated (and the user decision recorded) before any request bytes are transmitted, e.g. by performing a preflight TLS connect to obtain the cert/fingerprint and then enforcing it during the actual request (or by using urllib3’s fingerprint assertion during connect for the request once the fingerprint is known).

Copilot uses AI. Check for mistakes.

return response