Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -762,12 +762,16 @@ private static void cancelFuture(CompletableFuture<?> future) {
private void closeRetainedConnection() {
if (retainedConnection != null) {
try {
// This call may take a long time if the response has not been consumed
// as HTTP client will consume the remaining response so it can re-use the
// connection. If we're closing when we're not at the end of the stream then
// issue a warning to the logs
if (retainedConnection.read() != -1)
if (isAborted) {
// Don't drain on abort - cancel the request and close without reading.
cancelFuture(future);
} else if (retainedConnection.read() != -1) {
// This call may take a long time if the response has not been consumed
// as HTTP client will consume the remaining response so it can re-use the
// connection. If we're closing when we're not at the end of the stream then
// issue a warning to the logs
Log.warn(this, "HTTP response not fully consumed, if HTTP Client is reusing connections (its default behaviour) then it will consume the remaining response data which may take a long time and cause this application to become unresponsive");
}
retainedConnection.close();
} catch (RuntimeIOException | java.io.IOException e) {
// If we are closing early and the underlying stream is chunk encoded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.json.JsonArray;
Expand Down Expand Up @@ -339,4 +341,34 @@ public void query_graph_uri_5() {
assertEquals(2, x);
}
}

// Aborting a streaming QueryExecHTTP must not block on close and must not
// leave the server unable to serve subsequent queries (GH-3837).
@Test
@Timeout(value = 10, unit = TimeUnit.SECONDS)
public void query_abort_no_wedge() {
DatasetGraph dsg2 = DatasetGraphFactory.createTxnMem();
for (int i = 0; i < 500; i++)
dsg2.add(parseQuad("(_ <http://example/s" + i + "> <http://example/p> <http://example/o" + i + ">)"));
FusekiServer server2 = FusekiServer.create().port(0).add("/ds2", dsg2).build();
server2.start();
String url2 = "http://localhost:" + server2.getPort() + "/ds2";
try {
for (int i = 0; i < 10; i++) {
try (QueryExecHTTP qExec = QueryExecHTTP.newBuilder()
.endpoint(url2).queryString("SELECT * { ?s ?p ?o }").build()) {
qExec.select();
qExec.abort();
}
}
// Server must still respond after the burst of aborted queries.
try (QueryExecHTTP qExec = QueryExecHTTP.newBuilder()
.endpoint(url2).queryString("SELECT (COUNT(*) AS ?n) { ?s ?p ?o }").build()) {
RowSet rs = qExec.select();
assertTrue(rs.hasNext());
}
} finally {
server2.stop();
}
}
}