diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index 10b045232c6..73647b19efc 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -102,7 +102,9 @@ public CommitStats commit(WUProcessingSpec workSpec) { int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS; Optional optJobName = Optional.empty(); AutomaticTroubleshooter troubleshooter = null; - try (FileSystem fs = Help.loadFileSystem(workSpec)) { + FileSystem fs = null; + try { + fs = Help.loadFileSystem(workSpec); JobState jobState = Help.loadJobState(workSpec, fs); int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState); @@ -172,6 +174,7 @@ public CommitStats commit(WUProcessingSpec workSpec) { new IOException(e) ); } finally { + closeFileSystemAsync(fs); String errCorrelator = String.format("Commit [%s]", calcCommitId(workSpec)); EventSubmitter eventSubmitter = workSpec.getEventSubmitterContext().create(); Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, errCorrelator); @@ -179,6 +182,29 @@ public CommitStats commit(WUProcessingSpec workSpec) { } } + /** + * Closes a {@link FileSystem} on a daemon thread to avoid blocking the caller. + * + * Using a non-cached (force-loaded) FileSystem means each commit activity owns its own {@link org.apache.hadoop.hdfs.DFSClient} + * and therefore its own IPC {@code Client}. Closing that Client via {@code Client.stop()} waits for all open + * connections to drain, which can hang indefinitely when connection threads are blocked in native socket reads. + * By closing asynchronously we keep the activity thread unblocked while cleanup proceeds in the background. + */ + private void closeFileSystemAsync(FileSystem fs) { + if (fs == null) { + return; + } + Thread t = new Thread(() -> { + try { + fs.close(); + } catch (IOException e) { + log.warn("Async FileSystem close encountered an error (fs: {})", fs.getUri(), e); + } + }, "CommitActivity-AsyncFSClose"); + t.setDaemon(true); + t.start(); + } + /** * Commit task states to the dataset state store. * @param jobState