From 8b7078f1541a39b6fbe23f55a7c4306b9b976e30 Mon Sep 17 00:00:00 2001 From: murakami-ta Date: Fri, 5 Jun 2026 09:36:51 +0900 Subject: [PATCH] Add retry to POST /v1/statement --- lib/trino/client/statement_client.rb | 74 +++++++++++++++---------- spec/statement_client_spec.rb | 82 ++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 29 deletions(-) diff --git a/lib/trino/client/statement_client.rb b/lib/trino/client/statement_client.rb index b8ea2902..9d6bba91 100644 --- a/lib/trino/client/statement_client.rb +++ b/lib/trino/client/statement_client.rb @@ -26,6 +26,8 @@ class StatementClient :max_nesting => false } + RETRYABLE_STATUSES = [502, 503, 504] + def initialize(faraday, query, options, next_uri=nil) @faraday = faraday @@ -66,16 +68,27 @@ def init_request(req) def post_query_request! uri = "/v1/statement" - response = @faraday.post do |req| - req.url uri + response = with_retry_loop do + begin + r = @faraday.post do |req| + req.url uri - req.body = @query - init_request(req) - end + req.body = @query + init_request(req) + end + rescue Faraday::TimeoutError, Faraday::ConnectionFailed + throw :retry_with_backoff + rescue => e + exception! e + end - # TODO error handling - if response.status != 200 - exception! TrinoHttpError.new(response.status, "Failed to start query: #{response.body} (#{response.status})") + if r.status == 200 + r + elsif RETRYABLE_STATUSES.include?(r.status) + throw :retry_with_backoff + else + exception! TrinoHttpError.new(r.status, "Failed to start query: #{r.body} (#{r.status})") + end end @results_headers = response.headers @@ -189,31 +202,12 @@ def parse_body(response) private :parse_body - def faraday_get_with_retry(uri, &block) + def with_retry_loop start = Process.clock_gettime(Process::CLOCK_MONOTONIC) attempts = 0 loop do - begin - response = @faraday.get(uri) - rescue Faraday::TimeoutError, Faraday::ConnectionFailed - # temporally error to retry - response = nil - rescue => e - exception! e - end - - if response - if response.status == 200 && !response.body.to_s.empty? - return response - end - - # retry if 502, 503, 504 according to the trino protocol - unless [502, 503, 504].include?(response.status) - # deterministic error - exception! TrinoHttpError.new(response.status, "Trino API error at #{uri} returned #{response.status}: #{response.body}") - end - end + catch(:retry_with_backoff) { return yield } raise_if_timeout! @@ -226,6 +220,28 @@ def faraday_get_with_retry(uri, &block) exception! TrinoHttpError.new(408, "Trino API error due to timeout") end + private :with_retry_loop + + def faraday_get_with_retry(uri) + with_retry_loop do + begin + response = @faraday.get(uri) + rescue Faraday::TimeoutError, Faraday::ConnectionFailed + throw :retry_with_backoff + rescue => e + exception! e + end + + if response.status == 200 && !response.body.to_s.empty? + response + elsif RETRYABLE_STATUSES.include?(response.status) + throw :retry_with_backoff + else + exception! TrinoHttpError.new(response.status, "Trino API error at #{uri} returned #{response.status}: #{response.body}") + end + end + end + def raise_if_timeout! if @started_at return if finished? diff --git a/spec/statement_client_spec.rb b/spec/statement_client_spec.rb index 1696c220..b83754ed 100644 --- a/spec/statement_client_spec.rb +++ b/spec/statement_client_spec.rb @@ -162,6 +162,88 @@ end end + describe "POST /v1/statement retry" do + let :headers do + { + "User-Agent" => "trino-ruby/#{VERSION}", + "X-Trino-Catalog" => options[:catalog], + "X-Trino-Schema" => options[:schema], + "X-Trino-User" => options[:user], + "X-Trino-Language" => options[:language], + "X-Trino-Time-Zone" => options[:time_zone], + } + end + + it "retries POST on Timeout::Error" do + attempts = 0 + stub_request(:post, "localhost/v1/statement"). + with(body: query, headers: headers). + to_return(body: lambda { |req| + attempts += 1 + if attempts < 2 + raise Timeout::Error.new("execution expired") + else + response_json.to_json + end + }) + + sc = StatementClient.new(faraday, query, options.merge(http_open_timeout: 1)) + expect(sc.query_id).to eq "queryid" + expect(attempts).to eq 2 + end + + it "retries POST on Faraday::ConnectionFailed" do + attempts = 0 + stub_request(:post, "localhost/v1/statement"). + with(body: query, headers: headers). + to_return(body: lambda { |req| + attempts += 1 + if attempts < 2 + raise Faraday::ConnectionFailed.new("connection refused") + else + response_json.to_json + end + }) + + sc = StatementClient.new(faraday, query, options) + expect(sc.query_id).to eq "queryid" + expect(attempts).to eq 2 + end + + it "retries POST on 503 response" do + attempts = 0 + stub_request(:post, "localhost/v1/statement"). + with(body: query, headers: headers). + to_return(lambda { |req| + attempts += 1 + if attempts < 2 + {status: 503, body: "service unavailable"} + else + {status: 200, body: response_json.to_json} + end + }) + + sc = StatementClient.new(faraday, query, options) + expect(sc.query_id).to eq "queryid" + expect(attempts).to eq 2 + end + + it "does not retry POST on deterministic 4xx errors" do + attempts = 0 + stub_request(:post, "localhost/v1/statement"). + with(body: query, headers: headers). + to_return(lambda { |req| + attempts += 1 + {status: 400, body: "bad request"} + }) + + expect do + StatementClient.new(faraday, query, options) + end.to raise_error(Trino::Client::TrinoHttpError, /Failed to start query: bad request \(400\)/) + expect(attempts).to eq 1 + end + end + it "receives headers of POST" do stub_request(:post, "localhost/v1/statement"). with(body: query).to_return(body: response_json2.to_json, headers: {"X-Test-Header" => "123"})