Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/trino/client/statement_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def post_query_request!
init_request(req)
end
rescue Faraday::TimeoutError, Faraday::ConnectionFailed
# POST /v1/statement is not idempotent; retrying transport-level errors
# (timeouts/connection failures) can submit duplicate queries. This is
# intentional under the assumption the failure happened before Trino
# received the request (e.g. rejected by a load balancer upstream).
throw :retry_with_backoff
rescue => e
exception! e
Expand Down
42 changes: 28 additions & 14 deletions spec/statement_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@
}
end

it "retries POST on Timeout::Error" do
it "retries POST on Faraday::TimeoutError" do
Comment thread
yuokada marked this conversation as resolved.
attempts = 0
stub_request(:post, "localhost/v1/statement").
with(body: query, headers: headers).
Expand Down Expand Up @@ -210,22 +210,36 @@
expect(attempts).to eq 2
end

it "retries POST on 503 response" do
attempts = 0
[502, 503, 504].each do |status|
it "retries POST on #{status} response" do
attempts = 0
stub_request(:post, "localhost/v1/statement").
with(body: query, headers: headers).
to_return(lambda { |req|
attempts += 1
if attempts < 2
{status: status, body: "service unavailable"}
else
{status: 200, body: response_json.to_json}
end
})

sc = StatementClient.new(faraday, query, options)
expect(sc.query_id).to eq "queryid"
expect(attempts).to eq 2
end
end

it "raises TrinoHttpError after retry_timeout is exhausted on POST" do
stub_request(:post, "localhost/v1/statement").
with(body: query, headers: headers).
to_return(lambda { |req|
attempts += 1
if attempts < 2
{status: 503, body: "service unavailable"}
else
{status: 200, body: response_json.to_json}
end
})
to_return(status: 503, body: "service unavailable")

sc = StatementClient.new(faraday, query, options)
expect(sc.query_id).to eq "queryid"
expect(attempts).to eq 2
expect do
StatementClient.new(faraday, query, options.merge(retry_timeout: 0))
end.to raise_error(Trino::Client::TrinoHttpError, "Trino API error due to timeout") do |e|
expect(e.status).to eq 408
end
end

it "does not retry POST on deterministic 4xx errors" do
Expand Down
Loading