diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java index f7ffc2f2f7c..bce2484d4e8 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java @@ -762,12 +762,16 @@ private static void cancelFuture(CompletableFuture future) { private void closeRetainedConnection() { if (retainedConnection != null) { try { - // This call may take a long time if the response has not been consumed - // as HTTP client will consume the remaining response so it can re-use the - // connection. If we're closing when we're not at the end of the stream then - // issue a warning to the logs - if (retainedConnection.read() != -1) + if (isAborted) { + // Don't drain on abort - cancel the request and close without reading. + cancelFuture(future); + } else if (retainedConnection.read() != -1) { + // This call may take a long time if the response has not been consumed + // as HTTP client will consume the remaining response so it can re-use the + // connection. If we're closing when we're not at the end of the stream then + // issue a warning to the logs Log.warn(this, "HTTP response not fully consumed, if HTTP Client is reusing connections (its default behaviour) then it will consume the remaining response data which may take a long time and cause this application to become unresponsive"); + } retainedConnection.close(); } catch (RuntimeIOException | java.io.IOException e) { // If we are closing early and the underlying stream is chunk encoded diff --git a/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/http/TestQueryExecHTTP.java b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/http/TestQueryExecHTTP.java index ae30518ee9b..33f0e64d43d 100644 --- a/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/http/TestQueryExecHTTP.java +++ b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/http/TestQueryExecHTTP.java @@ -27,10 +27,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Iterator; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.atlas.json.JsonArray; @@ -339,4 +341,34 @@ public void query_graph_uri_5() { assertEquals(2, x); } } + + // Aborting a streaming QueryExecHTTP must not block on close and must not + // leave the server unable to serve subsequent queries (GH-3837). + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + public void query_abort_no_wedge() { + DatasetGraph dsg2 = DatasetGraphFactory.createTxnMem(); + for (int i = 0; i < 500; i++) + dsg2.add(parseQuad("(_ )")); + FusekiServer server2 = FusekiServer.create().port(0).add("/ds2", dsg2).build(); + server2.start(); + String url2 = "http://localhost:" + server2.getPort() + "/ds2"; + try { + for (int i = 0; i < 10; i++) { + try (QueryExecHTTP qExec = QueryExecHTTP.newBuilder() + .endpoint(url2).queryString("SELECT * { ?s ?p ?o }").build()) { + qExec.select(); + qExec.abort(); + } + } + // Server must still respond after the burst of aborted queries. + try (QueryExecHTTP qExec = QueryExecHTTP.newBuilder() + .endpoint(url2).queryString("SELECT (COUNT(*) AS ?n) { ?s ?p ?o }").build()) { + RowSet rs = qExec.select(); + assertTrue(rs.hasNext()); + } + } finally { + server2.stop(); + } + } }