Skip to content

Commit af7565a

Browse files
Add DIGEST-MD5 SASL delegation token auth to HiveCatalog
Enable PyIceberg's HiveCatalog to authenticate using DIGEST-MD5 SASL with delegation tokens from $HADOOP_TOKEN_FILE_LOCATION, which is the standard mechanism in secure Hadoop environments. This unblocks PyIceberg adoption in production clusters that don't use Kerberos directly. - Add HiveAuthError exception for Hive-specific auth failures - Add hadoop_credentials module to parse HDTS binary token files - Add _DigestMD5SaslTransport to work around THRIFT-5926 (None initial response) - Support hive.metastore.authentication property (NONE/KERBEROS/DIGEST-MD5) - Add pure-sasl to hive extras in pyproject.toml - Backward compatible: existing kerberos_auth boolean still works Closes #3145 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b98de51 commit af7565a

8 files changed

Lines changed: 532 additions & 15 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,9 @@ catalog:
685685
| hive.kerberos-authentication | true | Using authentication via Kerberos |
686686
| hive.kerberos-service-name | hive | Kerberos service name (default hive) |
687687
| ugi | t-1234:secret | Hadoop UGI for Hive client. |
688+
| hive.metastore.authentication | DIGEST-MD5 | Auth mechanism: `NONE` (default), `KERBEROS`, or `DIGEST-MD5` |
689+
690+
When using DIGEST-MD5 authentication, PyIceberg reads a Hive delegation token from the file pointed to by the `$HADOOP_TOKEN_FILE_LOCATION` environment variable. This is the standard mechanism used in secure Hadoop environments where delegation tokens are distributed to jobs. Install PyIceberg with `pip install "pyiceberg[hive]"` to get the required `puresasl` dependency.
688691

689692
When using Hive 2.x, make sure to set the compatibility flag:
690693

pyiceberg/catalog/hive.py

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
)
6464
from pyiceberg.exceptions import (
6565
CommitFailedException,
66+
HiveAuthError,
6667
NamespaceAlreadyExistsError,
6768
NamespaceNotEmptyError,
6869
NoSuchIcebergTableError,
@@ -109,6 +110,7 @@
109110
UnknownType,
110111
UUIDType,
111112
)
113+
from pyiceberg.utils.hadoop_credentials import read_hive_delegation_token
112114
from pyiceberg.utils.properties import property_as_bool, property_as_float
113115

114116
if TYPE_CHECKING:
@@ -127,6 +129,9 @@
127129
HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name"
128130
HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive"
129131

132+
HIVE_METASTORE_AUTH = "hive.metastore.authentication"
133+
HIVE_METASTORE_AUTH_DEFAULT = "NONE"
134+
130135
LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
131136
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
132137
LOCK_CHECK_RETRIES = "lock-check-retries"
@@ -139,6 +144,33 @@
139144
logger = logging.getLogger(__name__)
140145

141146

147+
class _DigestMD5SaslTransport(TTransport.TSaslClientTransport):
148+
"""TSaslClientTransport subclass that works around THRIFT-5926.
149+
150+
The upstream ``TSaslClientTransport.open()`` passes the first
151+
``sasl.process()`` response directly to ``_send_sasl_message()``,
152+
but for DIGEST-MD5 the initial response is ``None`` (challenge-first
153+
mechanism). Sending ``None`` causes a ``TypeError``. This subclass
154+
coerces ``None`` to ``b""`` so the SASL handshake proceeds normally.
155+
"""
156+
157+
def open(self) -> None:
158+
# Intercept sasl.process to coerce the initial None response
159+
original_process = self.sasl.process
160+
161+
def _patched_process(challenge: bytes | None = None) -> bytes | None:
162+
result = original_process(challenge)
163+
if result is None:
164+
return b""
165+
return result
166+
167+
self.sasl.process = _patched_process
168+
try:
169+
super().open()
170+
finally:
171+
self.sasl.process = original_process
172+
173+
142174
class _HiveClient:
143175
"""Helper class to nicely open and close the transport."""
144176

@@ -151,20 +183,41 @@ def __init__(
151183
ugi: str | None = None,
152184
kerberos_auth: bool | None = HIVE_KERBEROS_AUTH_DEFAULT,
153185
kerberos_service_name: str | None = HIVE_KERBEROS_SERVICE_NAME,
186+
auth_mechanism: str | None = None,
154187
):
155188
self._uri = uri
156-
self._kerberos_auth = kerberos_auth
157189
self._kerberos_service_name = kerberos_service_name
158190
self._ugi = ugi.split(":") if ugi else None
191+
192+
# Resolve auth mechanism: explicit auth_mechanism takes precedence,
193+
# then fall back to legacy kerberos_auth boolean for backward compat.
194+
if auth_mechanism is not None:
195+
self._auth_mechanism = auth_mechanism.upper()
196+
elif kerberos_auth:
197+
self._auth_mechanism = "KERBEROS"
198+
else:
199+
self._auth_mechanism = HIVE_METASTORE_AUTH_DEFAULT
200+
159201
self._transport = self._init_thrift_transport()
160202

161203
def _init_thrift_transport(self) -> TTransport:
162204
url_parts = urlparse(self._uri)
163205
socket = TSocket.TSocket(url_parts.hostname, url_parts.port)
164-
if not self._kerberos_auth:
165-
return TTransport.TBufferedTransport(socket)
166-
else:
206+
207+
if self._auth_mechanism == "KERBEROS":
167208
return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service=self._kerberos_service_name)
209+
elif self._auth_mechanism == "DIGEST-MD5":
210+
identifier, password = read_hive_delegation_token()
211+
return _DigestMD5SaslTransport(
212+
socket,
213+
host=url_parts.hostname,
214+
service=self._kerberos_service_name,
215+
mechanism="DIGEST-MD5",
216+
username=identifier,
217+
password=password,
218+
)
219+
else:
220+
return TTransport.TBufferedTransport(socket)
168221

169222
def _client(self) -> Client:
170223
protocol = TBinaryProtocol.TBinaryProtocol(self._transport)
@@ -319,6 +372,7 @@ def _create_hive_client(properties: dict[str, str]) -> _HiveClient:
319372
properties.get("ugi"),
320373
property_as_bool(properties, HIVE_KERBEROS_AUTH, HIVE_KERBEROS_AUTH_DEFAULT),
321374
properties.get(HIVE_KERBEROS_SERVICE_NAME, HIVE_KERBEROS_SERVICE_NAME_DEFAULT),
375+
auth_mechanism=properties.get(HIVE_METASTORE_AUTH),
322376
)
323377
except BaseException as e:
324378
last_exception = e

pyiceberg/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,7 @@ class WaitingForLockException(Exception):
130130

131131
class ValidationException(Exception):
132132
"""Raised when validation fails."""
133+
134+
135+
class HiveAuthError(Exception):
136+
"""Raised when Hive Metastore authentication fails."""
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Hadoop Delegation Token Service (HDTS) file parser.
19+
20+
Reads delegation tokens from the binary token file pointed to by
21+
the ``$HADOOP_TOKEN_FILE_LOCATION`` environment variable.
22+
"""
23+
24+
from __future__ import annotations
25+
26+
import base64
27+
import os
28+
from io import BytesIO
29+
30+
from pyiceberg.exceptions import HiveAuthError
31+
32+
HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
33+
HIVE_DELEGATION_TOKEN_KIND = "HIVE_DELEGATION_TOKEN"
34+
HDTS_MAGIC = b"HDTS"
35+
HDTS_SUPPORTED_VERSION = 0
36+
37+
38+
def _read_hadoop_vint(stream: BytesIO) -> int:
39+
"""Decode a Hadoop WritableUtils VInt/VLong from a byte stream."""
40+
first = stream.read(1)
41+
if not first:
42+
raise HiveAuthError("Unexpected end of token file while reading VInt")
43+
b = first[0]
44+
if b <= 0x7F:
45+
return b
46+
# Number of additional bytes is encoded in leading 1-bits
47+
num_extra = 0
48+
mask = 0x80
49+
while b & mask:
50+
num_extra += 1
51+
mask >>= 1
52+
# First byte contributes the remaining bits
53+
result = b & (mask - 1)
54+
extra = stream.read(num_extra)
55+
if len(extra) != num_extra:
56+
raise HiveAuthError("Unexpected end of token file while reading VInt")
57+
for byte in extra:
58+
result = (result << 8) | byte
59+
# Sign-extend if negative (high bit of decoded value is set)
60+
if result >= (1 << (8 * num_extra + (8 - num_extra - 1) - 1)):
61+
result -= 1 << (8 * num_extra + (8 - num_extra - 1))
62+
return result
63+
64+
65+
def _read_hadoop_bytes(stream: BytesIO) -> bytes:
66+
"""Read a VInt-prefixed byte array from a Hadoop token stream."""
67+
length = _read_hadoop_vint(stream)
68+
if length < 0:
69+
raise HiveAuthError(f"Invalid byte array length: {length}")
70+
data = stream.read(length)
71+
if len(data) != length:
72+
raise HiveAuthError("Unexpected end of token file while reading byte array")
73+
return data
74+
75+
76+
def _read_hadoop_text(stream: BytesIO) -> str:
77+
"""Read a VInt-prefixed UTF-8 string from a Hadoop token stream."""
78+
return _read_hadoop_bytes(stream).decode("utf-8")
79+
80+
81+
def read_hive_delegation_token() -> tuple[str, str]:
82+
"""Read a Hive delegation token from ``$HADOOP_TOKEN_FILE_LOCATION``.
83+
84+
Returns:
85+
A ``(identifier, password)`` tuple where both values are
86+
base64-encoded strings suitable for SASL DIGEST-MD5 auth.
87+
88+
Raises:
89+
HiveAuthError: If the token file is missing, malformed, or
90+
does not contain a ``HIVE_DELEGATION_TOKEN``.
91+
"""
92+
token_file = os.environ.get(HADOOP_TOKEN_FILE_LOCATION)
93+
if not token_file:
94+
raise HiveAuthError(
95+
f"${HADOOP_TOKEN_FILE_LOCATION} environment variable is not set. "
96+
"A Hadoop delegation token file is required for DIGEST-MD5 authentication."
97+
)
98+
99+
try:
100+
with open(token_file, "rb") as f:
101+
data = f.read()
102+
except FileNotFoundError:
103+
raise HiveAuthError(f"Hadoop token file not found: {token_file}")
104+
105+
stream = BytesIO(data)
106+
107+
magic = stream.read(4)
108+
if magic != HDTS_MAGIC:
109+
raise HiveAuthError(f"Invalid Hadoop token file magic: expected {HDTS_MAGIC!r}, got {magic!r}")
110+
111+
version_byte = stream.read(1)
112+
if not version_byte:
113+
raise HiveAuthError("Unexpected end of token file while reading version")
114+
version = version_byte[0]
115+
if version != HDTS_SUPPORTED_VERSION:
116+
raise HiveAuthError(f"Unsupported Hadoop token file version: {version}")
117+
118+
num_tokens = _read_hadoop_vint(stream)
119+
120+
for _ in range(num_tokens):
121+
# Each token entry: identifier_bytes, password_bytes, kind_text, service_text
122+
identifier_bytes = _read_hadoop_bytes(stream)
123+
password_bytes = _read_hadoop_bytes(stream)
124+
kind = _read_hadoop_text(stream)
125+
_service = _read_hadoop_text(stream)
126+
127+
if kind == HIVE_DELEGATION_TOKEN_KIND:
128+
return (
129+
base64.b64encode(identifier_bytes).decode("ascii"),
130+
base64.b64encode(password_bytes).decode("ascii"),
131+
)
132+
133+
raise HiveAuthError(
134+
f"No {HIVE_DELEGATION_TOKEN_KIND} found in token file: {token_file}. "
135+
f"File contains {num_tokens} token(s)."
136+
)

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ bodo = ["bodo>=2025.7.4"]
7474
daft = ["daft>=0.5.0"]
7575
polars = ["polars>=1.21.0,<2"]
7676
snappy = ["python-snappy>=0.6.0,<1.0.0"]
77-
hive = ["thrift>=0.13.0,<1.0.0"]
77+
hive = [
78+
"thrift>=0.13.0,<1.0.0",
79+
"pure-sasl>=0.6.0",
80+
]
7881
hive-kerberos = [
7982
"thrift>=0.13.0,<1.0.0",
8083
"thrift-sasl>=0.4.3",

0 commit comments

Comments
 (0)