Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 45 additions & 29 deletions lib/trino/client/statement_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class StatementClient
:max_nesting => false
}

RETRYABLE_STATUSES = [502, 503, 504]

def initialize(faraday, query, options, next_uri=nil)
@faraday = faraday

Expand Down Expand Up @@ -66,16 +68,27 @@ def init_request(req)

def post_query_request!
uri = "/v1/statement"
response = @faraday.post do |req|
req.url uri
response = with_retry_loop do
begin
r = @faraday.post do |req|
req.url uri

req.body = @query
init_request(req)
end
req.body = @query
init_request(req)
end
rescue Faraday::TimeoutError, Faraday::ConnectionFailed
throw :retry_with_backoff
rescue => e
exception! e
end

# TODO error handling
if response.status != 200
exception! TrinoHttpError.new(response.status, "Failed to start query: #{response.body} (#{response.status})")
if r.status == 200
r
elsif RETRYABLE_STATUSES.include?(r.status)
throw :retry_with_backoff
else
exception! TrinoHttpError.new(r.status, "Failed to start query: #{r.body} (#{r.status})")
end
end

@results_headers = response.headers
Expand Down Expand Up @@ -189,31 +202,12 @@ def parse_body(response)

private :parse_body

def faraday_get_with_retry(uri, &block)
def with_retry_loop
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
attempts = 0

loop do
begin
response = @faraday.get(uri)
rescue Faraday::TimeoutError, Faraday::ConnectionFailed
# temporally error to retry
response = nil
rescue => e
exception! e
end

if response
if response.status == 200 && !response.body.to_s.empty?
return response
end

# retry if 502, 503, 504 according to the trino protocol
unless [502, 503, 504].include?(response.status)
# deterministic error
exception! TrinoHttpError.new(response.status, "Trino API error at #{uri} returned #{response.status}: #{response.body}")
end
end
catch(:retry_with_backoff) { return yield }

raise_if_timeout!

Expand All @@ -226,6 +220,28 @@ def faraday_get_with_retry(uri, &block)
exception! TrinoHttpError.new(408, "Trino API error due to timeout")
end

private :with_retry_loop

def faraday_get_with_retry(uri)
with_retry_loop do
begin
response = @faraday.get(uri)
rescue Faraday::TimeoutError, Faraday::ConnectionFailed
throw :retry_with_backoff
rescue => e
exception! e
end

if response.status == 200 && !response.body.to_s.empty?
response
elsif RETRYABLE_STATUSES.include?(response.status)
throw :retry_with_backoff
else
exception! TrinoHttpError.new(response.status, "Trino API error at #{uri} returned #{response.status}: #{response.body}")
end
end
end

def raise_if_timeout!
if @started_at
return if finished?
Expand Down
82 changes: 82 additions & 0 deletions spec/statement_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,88 @@
end
end

describe "POST /v1/statement retry" do
let :headers do
{
"User-Agent" => "trino-ruby/#{VERSION}",
"X-Trino-Catalog" => options[:catalog],
"X-Trino-Schema" => options[:schema],
"X-Trino-User" => options[:user],
"X-Trino-Language" => options[:language],
"X-Trino-Time-Zone" => options[:time_zone],
}
end

it "retries POST on Timeout::Error" do
attempts = 0
stub_request(:post, "localhost/v1/statement").
with(body: query, headers: headers).
to_return(body: lambda { |req|
attempts += 1
if attempts < 2
raise Timeout::Error.new("execution expired")
else
response_json.to_json
end
})

sc = StatementClient.new(faraday, query, options.merge(http_open_timeout: 1))
expect(sc.query_id).to eq "queryid"
expect(attempts).to eq 2
end

it "retries POST on Faraday::ConnectionFailed" do
attempts = 0
stub_request(:post, "localhost/v1/statement").
with(body: query, headers: headers).
to_return(body: lambda { |req|
attempts += 1
if attempts < 2
raise Faraday::ConnectionFailed.new("connection refused")
else
response_json.to_json
end
})

sc = StatementClient.new(faraday, query, options)
expect(sc.query_id).to eq "queryid"
expect(attempts).to eq 2
end

it "retries POST on 503 response" do
attempts = 0
stub_request(:post, "localhost/v1/statement").
with(body: query, headers: headers).
to_return(lambda { |req|
attempts += 1
if attempts < 2
{status: 503, body: "service unavailable"}
else
{status: 200, body: response_json.to_json}
end
})

sc = StatementClient.new(faraday, query, options)
expect(sc.query_id).to eq "queryid"
expect(attempts).to eq 2
end

it "does not retry POST on deterministic 4xx errors" do
attempts = 0
stub_request(:post, "localhost/v1/statement").
with(body: query, headers: headers).
to_return(lambda { |req|
attempts += 1
{status: 400, body: "bad request"}
})

expect do
StatementClient.new(faraday, query, options)
end.to raise_error(Trino::Client::TrinoHttpError, /Failed to start query: bad request \(400\)/)
expect(attempts).to eq 1
end
end

it "receives headers of POST" do
stub_request(:post, "localhost/v1/statement").
with(body: query).to_return(body: response_json2.to_json, headers: {"X-Test-Header" => "123"})
Expand Down
Loading