From d9ea251b76ca32efc2121f42ddf592ba30ccc8f3 Mon Sep 17 00:00:00 2001 From: ugurtafrali Date: Thu, 9 Apr 2026 00:21:06 +0300 Subject: [PATCH] Fix canceled federated SERVICE queries wedging dataset (GH-3837) When a federated SERVICE query gets canceled, the old code still tried to drain the response body on close. This caused the HTTP client to wait for the full response even though the query was aborted, which could wedge the connection and lock up the target dataset. Add isAborted check in closeRetainedConnection() to skip draining when the query was canceled. Also add test case to verify aborted queries don't hang on close. --- .../jena/sparql/exec/http/QueryExecHTTP.java | 14 +++++--- .../sparql/exec/http/TestQueryExecHTTP.java | 32 +++++++++++++++++++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java index f7ffc2f2f7c..bce2484d4e8 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java @@ -762,12 +762,16 @@ private static void cancelFuture(CompletableFuture future) { private void closeRetainedConnection() { if (retainedConnection != null) { try { - // This call may take a long time if the response has not been consumed - // as HTTP client will consume the remaining response so it can re-use the - // connection. If we're closing when we're not at the end of the stream then - // issue a warning to the logs - if (retainedConnection.read() != -1) + if (isAborted) { + // Don't drain on abort - cancel the request and close without reading. + cancelFuture(future); + } else if (retainedConnection.read() != -1) { + // This call may take a long time if the response has not been consumed + // as HTTP client will consume the remaining response so it can re-use the + // connection. If we're closing when we're not at the end of the stream then + // issue a warning to the logs Log.warn(this, "HTTP response not fully consumed, if HTTP Client is reusing connections (its default behaviour) then it will consume the remaining response data which may take a long time and cause this application to become unresponsive"); + } retainedConnection.close(); } catch (RuntimeIOException | java.io.IOException e) { // If we are closing early and the underlying stream is chunk encoded diff --git a/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/http/TestQueryExecHTTP.java b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/http/TestQueryExecHTTP.java index ae30518ee9b..33f0e64d43d 100644 --- a/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/http/TestQueryExecHTTP.java +++ b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/http/TestQueryExecHTTP.java @@ -27,10 +27,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Iterator; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.atlas.json.JsonArray; @@ -339,4 +341,34 @@ public void query_graph_uri_5() { assertEquals(2, x); } } + + // Aborting a streaming QueryExecHTTP must not block on close and must not + // leave the server unable to serve subsequent queries (GH-3837). + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + public void query_abort_no_wedge() { + DatasetGraph dsg2 = DatasetGraphFactory.createTxnMem(); + for (int i = 0; i < 500; i++) + dsg2.add(parseQuad("(_ )")); + FusekiServer server2 = FusekiServer.create().port(0).add("/ds2", dsg2).build(); + server2.start(); + String url2 = "http://localhost:" + server2.getPort() + "/ds2"; + try { + for (int i = 0; i < 10; i++) { + try (QueryExecHTTP qExec = QueryExecHTTP.newBuilder() + .endpoint(url2).queryString("SELECT * { ?s ?p ?o }").build()) { + qExec.select(); + qExec.abort(); + } + } + // Server must still respond after the burst of aborted queries. + try (QueryExecHTTP qExec = QueryExecHTTP.newBuilder() + .endpoint(url2).queryString("SELECT (COUNT(*) AS ?n) { ?s ?p ?o }").build()) { + RowSet rs = qExec.select(); + assertTrue(rs.hasNext()); + } + } finally { + server2.stop(); + } + } }