diff --git a/dataset/pom.xml b/dataset/pom.xml
index 7a0210ce95..6824cb2f61 100644
--- a/dataset/pom.xml
+++ b/dataset/pom.xml
@@ -111,6 +111,67 @@ under the License.
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${dep.hadoop.version}
+ test
+
+
+ commons-logging
+ commons-logging
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${dep.hadoop.version}
+ test-jar
+ test
+
+
+ commons-logging
+ commons-logging
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+ ${dep.hadoop.version}
+ test
+
+
+ commons-logging
+ commons-logging
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
com.google.guava
guava
diff --git a/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
index fcf124a61f..864ce51a9a 100644
--- a/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
+++ b/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
@@ -16,7 +16,11 @@
*/
package org.apache.arrow.dataset.file;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.LinkedHashSet;
import java.util.Optional;
+import java.util.Set;
import org.apache.arrow.dataset.jni.NativeDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.FragmentScanOptions;
@@ -25,9 +29,12 @@
/** Java binding of the C++ FileSystemDatasetFactory. */
public class FileSystemDatasetFactory extends NativeDatasetFactory {
+ private final Set hdfsFileSystems;
+
public FileSystemDatasetFactory(
BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String uri) {
super(allocator, memoryPool, createNative(format, uri, Optional.empty()));
+ this.hdfsFileSystems = toHdfsFileSystems(uri);
}
public FileSystemDatasetFactory(
@@ -37,11 +44,13 @@ public FileSystemDatasetFactory(
String uri,
Optional fragmentScanOptions) {
super(allocator, memoryPool, createNative(format, uri, fragmentScanOptions));
+ this.hdfsFileSystems = toHdfsFileSystems(uri);
}
public FileSystemDatasetFactory(
BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String[] uris) {
super(allocator, memoryPool, createNative(format, uris, Optional.empty()));
+ this.hdfsFileSystems = toHdfsFileSystems(uris);
}
public FileSystemDatasetFactory(
@@ -51,6 +60,65 @@ public FileSystemDatasetFactory(
String[] uris,
Optional fragmentScanOptions) {
super(allocator, memoryPool, createNative(format, uris, fragmentScanOptions));
+ this.hdfsFileSystems = toHdfsFileSystems(uris);
+ }
+
+ /**
+ * Close this factory and release the native instance. For HDFS URIs, also closes the cached
+ * Hadoop FileSystem to release non-daemon threads that would otherwise prevent JVM exit. See #1067.
+ */
+ @Override
+ public synchronized void close() {
+ try {
+ super.close();
+ } finally {
+ hdfsFileSystems.forEach(FileSystemDatasetFactory::closeHadoopFileSystem);
+ }
+ }
+
+ /**
+ * For each {@code hdfs://} URI, close the cached Hadoop FileSystem.
+ * When Arrow C++ accesses HDFS via libhdfs, the Hadoop Java client creates cached FileSystem
+ * instances with non-daemon threads (IPC connections, lease renewers) that prevent JVM exit.
+ * Closing the FileSystem terminates these connections. Uses reflection to avoid a compile-time
+ * dependency on hadoop-common.
+ */
+ static void closeHadoopFileSystemsIfHdfs(String... uris) {
+ toHdfsFileSystems(uris).forEach(FileSystemDatasetFactory::closeHadoopFileSystem);
+ }
+
+ private static Set toHdfsFileSystems(String... uris) {
+ Set hdfsFileSystems = new LinkedHashSet<>();
+ if (uris == null) {
+ return hdfsFileSystems;
+ }
+ for (String uri : uris) {
+ try {
+ URI parsedUri = new URI(uri);
+ if ("hdfs".equalsIgnoreCase(parsedUri.getScheme())) {
+ hdfsFileSystems.add(
+ new URI(parsedUri.getScheme(), parsedUri.getAuthority(), null, null, null));
+ }
+ } catch (Exception e) {
+ // Ignore here; native factory creation reports invalid user URIs.
+ }
+ }
+ return hdfsFileSystems;
+ }
+
+ private static void closeHadoopFileSystem(URI hdfsUri) {
+ try {
+ Class> confClass = Class.forName("org.apache.hadoop.conf.Configuration");
+ Object conf = confClass.getDeclaredConstructor().newInstance();
+ Class> fsClass = Class.forName("org.apache.hadoop.fs.FileSystem");
+ Method getMethod = fsClass.getMethod("get", URI.class, confClass);
+ Object fs = getMethod.invoke(null, hdfsUri, conf);
+ Method closeMethod = fsClass.getMethod("close");
+ closeMethod.invoke(fs);
+ } catch (Exception e) {
+ // Best-effort cleanup; Hadoop may not be on classpath or FileSystem already closed
+ }
}
private static long createNative(
diff --git a/dataset/src/test/java/org/apache/arrow/dataset/file/TestHdfsFileSystemCleanup.java b/dataset/src/test/java/org/apache/arrow/dataset/file/TestHdfsFileSystemCleanup.java
new file mode 100644
index 0000000000..d0bbd4d01a
--- /dev/null
+++ b/dataset/src/test/java/org/apache/arrow/dataset/file/TestHdfsFileSystemCleanup.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset.file;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/** Regression test for #1067. */
+public class TestHdfsFileSystemCleanup {
+
+ private static final int CHILD_TIMEOUT_SECONDS = 10;
+
+ private static MiniDFSCluster cluster;
+
+ @TempDir static File clusterDir;
+
+ @BeforeAll
+ static void startCluster() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterDir.getAbsolutePath());
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ }
+
+ @AfterAll
+ static void stopCluster() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ void testJvmHangsWithoutCleanup() throws Exception {
+ Process child = forkChildProcess(false);
+ assertFalse(waitForExit(child), "JVM should hang when HDFS is not cleaned up");
+ }
+
+ @Test
+ void testJvmExitsWithCleanup() throws Exception {
+ Process child = forkChildProcess(true);
+ assertTrue(
+ waitForExit(child), "JVM should exit when FileSystemDatasetFactory cleanup runs");
+ assertEquals(0, child.exitValue(), "Child process should exit cleanly (exit code 0)");
+ }
+
+ private boolean waitForExit(Process child) throws InterruptedException {
+ boolean exited = child.waitFor(CHILD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ if (!exited) {
+ child.destroyForcibly();
+ }
+ return exited;
+ }
+
+ private Process forkChildProcess(boolean withCleanup) throws IOException {
+ String classpath = System.getProperty("java.class.path");
+ int port = cluster.getNameNodePort();
+ ProcessBuilder pb =
+ new ProcessBuilder(
+ ProcessHandle.current().info().command().orElse("java"),
+ "-cp",
+ classpath,
+ HdfsClientSimulator.class.getName(),
+ String.valueOf(port),
+ String.valueOf(withCleanup));
+ pb.redirectError(ProcessBuilder.Redirect.DISCARD);
+ pb.redirectOutput(ProcessBuilder.Redirect.DISCARD);
+ return pb.start();
+ }
+
+ /** Simulates libhdfs leaving a non-daemon thread attached to an HDFS connection. */
+ public static class HdfsClientSimulator {
+ public static void main(String[] args) throws Exception {
+ int port = Integer.parseInt(args[0]);
+ boolean withCleanup = Boolean.parseBoolean(args[1]);
+
+ Configuration conf = new Configuration();
+ String hdfsUri = "hdfs://localhost:" + port;
+ conf.set("fs.defaultFS", hdfsUri);
+
+ FileSystem fs = FileSystem.get(conf);
+ fs.exists(new Path("/"));
+
+ Thread connectionThread =
+ new Thread(
+ () -> {
+ while (true) {
+ try {
+ fs.getFileStatus(new Path("/"));
+ Thread.sleep(500);
+ } catch (Exception e) {
+ break;
+ }
+ }
+ },
+ "simulated-libhdfs-ipc-thread");
+ connectionThread.setDaemon(false);
+ connectionThread.start();
+
+ if (withCleanup) {
+ FileSystemDatasetFactory.closeHadoopFileSystemsIfHdfs(hdfsUri);
+ connectionThread.join(5000);
+ }
+ }
+ }
+}