fix(spark): gss initiate failed on hms executors; spark.sql.catalog read options not applied#476
Conversation
|
Related #353 cc @LuciferYang @hamersaw ,Could you please take a look,Thank you. |
|
this pr documents three ways to set
Why the catalog conf is broken. Net effect. A user running Required changes (one file:
|
290f2f9 to
b832b6f
Compare
…ptions not taking effect Add executor_credential_refresh (default true). When false, skip executor-side namespace rebuild to avoid describeTable() on Kerberized HMS without a TGT (GSS initiate failed on fragment scans). Parse typed read flags in Builder.withCatalogDefaults() so keys under spark.sql.catalog.<name> (e.g. executor_credential_refresh, batch_size, push_down_filters) apply to plain SQL and DML, not only DataFrameReader.option paths.
b832b6f to
fb46a06
Compare
@LuciferYang Thanks for the review and for catching the withCatalogDefaults issue — that was a great find, and it really helped tighten the fix. |
|
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
Summary
Fixes
org.apache.thrift.transport.TTransportException: GSS initiate failedthrown on Spark executors when reading Lance tables registered in a Kerberized Hive Metastore (both plainSELECTand SQL DML).This PR does two things:
executor_credential_refresh(defaulttrue, preserving current behavior). When set tofalse, executors skip the eagernamespace.describeTable()RPC and open the dataset directly by URI using the storage options the driver already obtained.--conf spark.sql.catalog.<name>.executor_credential_refresh=false) is now parsed inwithCatalogDefaults(), sospark.sql(...)queries (includingSELECTand SQL DML) — which have nospark.read.option(...)attach point — pick up the flag the same way as DataFrameReader-based reads.Root Cause
Since #353 removed
LanceDatasetCache,LanceFragmentScanner.create()unconditionally rebuilds theLanceNamespaceclient on each Spark executor and binds it back ontoLanceSparkReadOptions. This forces the dataset open throughUtils.OpenDatasetBuilder'snamespaceClientbranch, which in turn callsOpenDatasetBuilder.buildFromNamespaceClient()in the Lance Java SDK — and that path issues an eagernamespace.describeTable()RPC before handing off to Rust.For catalogs where the backing service authenticates per-call (HMS over Kerberos, some REST catalogs), Spark executors typically do not have a Kerberos TGT — the
--keytab/--principalcredentials only reach the driver / ApplicationMaster, while executors run with Hadoop delegation tokens that cannot be used for HMS Thrift SASL. ThedescribeTableRPC therefore fails with:Driver-side operations (metadata-only queries, count-via-manifest) succeed because the driver has the TGT. The failure only manifests during fragment scans.
Why the Existing Behavior Exists
Rebuilding the namespace client on the executor is not dead code — it keeps the Rust
LanceNamespaceStorageOptionsProviderattached so that short-lived vended credentials (STS tokens for S3 / GCS / Azure) returned bydescribeTable()can be refreshed when they expire mid-scan. Simply removing the rebuild would break long-running scans against object stores that use credential vending.Fix
1. Gate the executor-side rebuild behind a new option
Add a boolean read option
executor_credential_refresh, defaulting totrue:true(default): unchanged — executor rebuilds the namespace client and routes through thenamespaceClientbranch, preserving credential refresh. Safe for all existing users.false: executor skips the rebuild, reads remain open via URI using theinitialStorageOptionsthe driver already obtained fromdescribeTable()at scan-plan time.2. Make catalog-level conf actually reach the typed field
Before this PR,
Builder.withCatalogDefaults(catalogConfig)only merged the storage-options map and never parsed typed flags. As a result, the catalog-conf syntax looked like it should work but silently ignored the flag. This PR extracts aparseTypedFlags(Map<String, String>)helper and calls it from bothfromOptions()andwithCatalogDefaults(), so every recognized read option (not justexecutor_credential_refresh) now flows from catalog conf into the typed field.This is what makes the fix usable from SQL DML. Without the
withCatalogDefaultsparse, a user runningDELETE FROM kerberized_hms_lance_table WHERE id = 1has no way to disable the rebuild — SQL DML has no per-statement.option(...)attach point.Configuration surfaces after this PR
spark.read.option("executor_credential_refresh", "false").table(...)(DataFrameReader)spark.sql("SELECT ..."))--conf spark.sql.catalog.lance.executor_credential_refresh=false+spark.sql("SELECT * FROM lance.db.t")--conf spark.sql.catalog.lance.executor_credential_refresh=false+spark.sql("DELETE FROM lance.db.t WHERE id=1")Intended usage for HMS + Kerberos deployments:
Per-Namespace Trade-off Analysis
The refresh callback is meaningful only for namespaces that actually return
storage_optionsfromdescribeTable(). Survey of the impls inlance-namespace-impls:describeTable()populatesstorage_options?executor_credential_refresh=falseHive2NamespacesetLocationonlyHive3NamespacesetLocationonlyGlueNamespaceconfig.getStorageOptions()IcebergNamespace(REST)PolarisNamespaceUnityNamespaceConcretely, for the HMS + S3 case: HMS does not vend S3 credentials (
describeTable()only setslocation), so the executor's S3 access is governed entirely by the AWS SDK credential chain (instance profile /hive-site.xml/ env vars /~/.aws/credentials) and the AWS SDK handles all STS rotation independently. The Lance refresh callback would have nothing to refresh, so disabling it costs nothing in practice.Scope of Change
LanceSparkReadOptions.java:CONFIG_EXECUTOR_CREDENTIAL_REFRESH, new fieldexecutorCredentialRefresh(defaulttrue), builder / getter /withVersionpropagation /equals/hashCode, Javadoc covering per-namespace trade-off.Builder.parseTypedFlags(Map<String, String>)helper from the previously duplicatedfromOptionsbody, now called from bothfromOptions()andwithCatalogDefaults(). This incidentally also fixes silent ignores ofpush_down_filters,batch_size,topN_push_down, etc. when set at the catalog level — a pre-existing latent issue uncovered while fixing the primary bug.LanceFragmentScanner.java: add&& readOptions.isExecutorCredentialRefresh()to the existing rebuildif, inline comment explaining the trade-off.LanceSparkReadOptionsSerializationTest.java: six new tests covering default value, map parsing, serialization round-trip,withVersionpropagation, catalog-defaults path, and per-read override precedence.No public API signature is changed; no existing behavior is altered for users who do not set the new option.
Test Plan
New unit tests (all 305 tests in
lance-spark-base_2.12pass locally):testExecutorCredentialRefreshDefaultsToTrue— default value preserved.testExecutorCredentialRefreshParsedFromOptions— flag honored from both"true"and"false"map entries.testExecutorCredentialRefreshSurvivesSerialization— flag survives Java serialization (critical: it must reach the executor).testExecutorCredentialRefreshPreservedByWithVersion— flag propagated bywithVersion()used during scan-plan version pinning.testExecutorCredentialRefreshFromCatalogDefaults— new; guards the catalog-conf path used by SQL DML.testPerReadOptionOverridesCatalogDefaults— new; pins the precedence rule "per-read.option(...)wins over catalog default".Integration test (out-of-band, on internal YARN + HMS + Kerberos + HDFS cluster):
spark-submitwith--keytab/--principal, Kerberized HMS,SELECT * FROM lance_hms_tablevialance-namespace-hive2.GSS initiate failedondescribeTable. Reproducible across multiple partitions / runs.--conf spark.sql.catalog.lance.executor_credential_refresh=false: scan completes, returns expected row count and sample rows.true) with the same fix jar: behavior unchanged.Backward Compatibility
Default is
true, so every existing job behaves identically without touching configs. Only users who explicitly set the new option tofalseopt into the new path.