From 52cecc457a45a1445814ba61c6d151ee07af8716 Mon Sep 17 00:00:00 2001 From: pratapAditya04 Date: Wed, 1 Apr 2026 14:57:17 +0530 Subject: [PATCH 1/2] Use non-cached FileSystem in CommitActivityImpl to prevent DFSClient hang on close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When CommitActivity closes its FileSystem, the cached (shared) instance tears down the IPC Client shared across all activities on the same worker. Client.stop() waits indefinitely for all in-flight RPC calls to drain — including calls from concurrently running ProcessWorkUnit activities — causing the Commit activity thread to hang forever. loadFileSystemForce() disables FS caching, giving CommitActivity its own private DFSClient. By the time commit closes the FS, its own calls are done, so Client.stop() returns immediately regardless of other activities' state. Co-Authored-By: Claude Sonnet 4.6 --- .../gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index 10b045232c6..28200ec45c5 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -102,7 +102,7 @@ public CommitStats commit(WUProcessingSpec workSpec) { int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS; Optional optJobName = Optional.empty(); AutomaticTroubleshooter troubleshooter = null; - try (FileSystem fs = Help.loadFileSystem(workSpec)) { + try (FileSystem fs = Help.loadFileSystemForce(workSpec)) { JobState jobState = Help.loadJobState(workSpec, fs); int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState); From dfdc06ecb6b45f2660c312e7b2f1bd1b452b714f Mon Sep 17 00:00:00 2001 From: pratapAditya04 Date: Wed, 1 Apr 2026 23:54:15 +0530 Subject: [PATCH 2/2] making it async the fs closure --- .../ddm/activity/impl/CommitActivityImpl.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index 28200ec45c5..73647b19efc 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -102,7 +102,9 @@ public CommitStats commit(WUProcessingSpec workSpec) { int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS; Optional optJobName = Optional.empty(); AutomaticTroubleshooter troubleshooter = null; - try (FileSystem fs = Help.loadFileSystemForce(workSpec)) { + FileSystem fs = null; + try { + fs = Help.loadFileSystem(workSpec); JobState jobState = Help.loadJobState(workSpec, fs); int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState); @@ -172,6 +174,7 @@ public CommitStats commit(WUProcessingSpec workSpec) { new IOException(e) ); } finally { + closeFileSystemAsync(fs); String errCorrelator = String.format("Commit [%s]", calcCommitId(workSpec)); EventSubmitter eventSubmitter = workSpec.getEventSubmitterContext().create(); Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, errCorrelator); @@ -179,6 +182,29 @@ public CommitStats commit(WUProcessingSpec workSpec) { } } + /** + * Closes a {@link FileSystem} on a daemon thread to avoid blocking the caller. + * + * Using a non-cached (force-loaded) FileSystem means each commit activity owns its own {@link org.apache.hadoop.hdfs.DFSClient} + * and therefore its own IPC {@code Client}. Closing that Client via {@code Client.stop()} waits for all open + * connections to drain, which can hang indefinitely when connection threads are blocked in native socket reads. + * By closing asynchronously we keep the activity thread unblocked while cleanup proceeds in the background. + */ + private void closeFileSystemAsync(FileSystem fs) { + if (fs == null) { + return; + } + Thread t = new Thread(() -> { + try { + fs.close(); + } catch (IOException e) { + log.warn("Async FileSystem close encountered an error (fs: {})", fs.getUri(), e); + } + }, "CommitActivity-AsyncFSClose"); + t.setDaemon(true); + t.start(); + } + /** * Commit task states to the dataset state store. * @param jobState