diff --git a/apisix/balancer.lua b/apisix/balancer.lua index 5b6b3b0291f3..a4651ca63765 100644 --- a/apisix/balancer.lua +++ b/apisix/balancer.lua @@ -34,6 +34,9 @@ local pickers = {} local lrucache_server_picker = core.lrucache.new({ ttl = 300, count = 256 }) +local lrucache_health_status = core.lrucache.new({ + ttl = 300, count = 256 +}) local lrucache_addr = core.lrucache.new({ ttl = 300, count = 1024 * 4 }) @@ -95,6 +98,63 @@ local function fetch_health_nodes(upstream, checker) end +local function fetch_all_nodes(upstream) + local nodes = upstream.nodes + local new_nodes = core.table.new(0, #nodes) + for _, node in ipairs(nodes) do + new_nodes = transform_node(new_nodes, node) + end + return new_nodes +end + + +local function create_health_status(upstream, checker) + local nodes = upstream.nodes + local host = upstream.checks and upstream.checks.active and upstream.checks.active.host + local port = upstream.checks and upstream.checks.active and upstream.checks.active.port + local health_status = core.table.new(0, #nodes) + local has_healthy_node = false + + for _, node in ipairs(nodes) do + local ok, err = healthcheck_manager.fetch_node_status(checker, + node.host, port or node.port, host) + local addr = node.host .. ":" .. node.port + if ok then + health_status[addr] = true + has_healthy_node = true + else + health_status[addr] = false + if err then + core.log.warn("failed to get health check target status, addr: ", + node.host, ":", port or node.port, ", host: ", host, ", err: ", err) + end + end + end + + if not has_healthy_node then + core.log.warn("all upstream nodes is unhealthy, use default") + return {all_unhealthy = true} + end + + return {status = health_status} +end + + +local function fetch_health_status(upstream, checker, key, version) + if not checker then + return nil + end + + local health_status = lrucache_health_status(key, version .. "#" .. checker.status_ver, + create_health_status, upstream, checker) + if not health_status or health_status.all_unhealthy then + return nil + end + + return health_status.status +end + + local function create_server_picker(upstream, checker) local picker = pickers[upstream.type] if not picker then @@ -112,7 +172,12 @@ local function create_server_picker(upstream, checker) end end - local up_nodes = fetch_health_nodes(upstream, checker) + local up_nodes + if upstream.type == "chash" then + up_nodes = fetch_all_nodes(upstream) + else + up_nodes = fetch_health_nodes(upstream, checker) + end if #up_nodes._priority_index > 1 then core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes)) @@ -229,7 +294,12 @@ local function pick_server(route, ctx) end end - if checker then + local health_status + if checker and up_conf.type == "chash" then + health_status = fetch_health_status(up_conf, checker, key, version) + end + + if checker and up_conf.type ~= "chash" then version = version .. "#" .. checker.status_ver end @@ -243,10 +313,29 @@ local function pick_server(route, ctx) return nil, "failed to fetch server picker" end - local server, err = server_picker.get(ctx) + local server, err + for _ = 1, nodes_count do + server, err = server_picker.get(ctx) + if not server then + err = err or "no valid upstream node" + return nil, "failed to find valid upstream server, " .. err + end + + if not health_status or health_status[server] then + break + end + + ctx.balancer_server = server + if not server_picker.after_balance then + return nil, "failed to skip unhealthy upstream server: after_balance is unavailable" + end + + server_picker.after_balance(ctx, true) + server = nil + end + if not server then - err = err or "no valid upstream node" - return nil, "failed to find valid upstream server, " .. err + return nil, "failed to find valid upstream server, all upstream servers tried" end ctx.balancer_server = server diff --git a/apisix/healthcheck_manager.lua b/apisix/healthcheck_manager.lua index 8133364ee292..249e63ef5a74 100644 --- a/apisix/healthcheck_manager.lua +++ b/apisix/healthcheck_manager.lua @@ -113,7 +113,13 @@ function _M.fetch_node_status(checker, ip, port, hostname) return true end - return checker:get_target_status(ip, port, hostname) + local ok, err = checker:get_target_status(ip, port, hostname) + if err == "target not found" then + -- Checker targets are created asynchronously, so this means unknown. + return true + end + + return ok, err end diff --git a/apisix/plugins/ai-proxy-multi.lua b/apisix/plugins/ai-proxy-multi.lua index 13a4b8e3e6dc..3c8df5a559fe 100644 --- a/apisix/plugins/ai-proxy-multi.lua +++ b/apisix/plugins/ai-proxy-multi.lua @@ -45,6 +45,9 @@ local pickers = {} local lrucache_server_picker = core.lrucache.new({ ttl = 300, count = 256 }) +local lrucache_health_status = core.lrucache.new({ + ttl = 300, count = 256 +}) local plugin_name = "ai-proxy-multi" local _M = { @@ -395,6 +398,105 @@ local function fetch_health_instances(conf, checkers) end +local function fetch_all_instances(conf) + local instances = conf.instances + local new_instances = core.table.new(0, #instances) + for _, ins in ipairs(instances) do + transform_instances(new_instances, ins) + end + + return new_instances +end + + +local function create_health_status(conf, checkers) + local instances = conf.instances + local health_status = core.table.new(0, #instances) + local healthy_dns_nodes = core.table.new(0, #instances) + local has_healthy_instance = false + + for _, ins in ipairs(instances) do + local checker = checkers[ins.name] + if checker then + local host = ins.checks and ins.checks.active and ins.checks.active.host + local port = ins.checks and ins.checks.active and ins.checks.active.port + local healthy_nodes = {} + + for _, node in ipairs(ins._dns_nodes or {}) do + local ok, err = checker:get_target_status(node.host, port or node.port, host) + if ok then + healthy_nodes[#healthy_nodes + 1] = node + elseif err then + core.log.warn("failed to get health check target status, addr: ", + node.host, ":", port or node.port, ", host: ", host, ", err: ", err) + end + end + + if #healthy_nodes > 0 then + healthy_dns_nodes[ins.name] = healthy_nodes + health_status[ins.name] = true + has_healthy_instance = true + else + health_status[ins.name] = false + end + else + health_status[ins.name] = true + has_healthy_instance = true + end + end + + if not has_healthy_instance then + core.log.warn("all upstream nodes is unhealthy, use default") + return {all_unhealthy = true} + end + + return { + status = health_status, + healthy_dns_nodes = healthy_dns_nodes, + } +end + + +local function apply_health_status(conf, health_status) + if not health_status or health_status.all_unhealthy then + for _, ins in ipairs(conf.instances) do + ins._healthy_dns_nodes = nil + end + + return nil + end + + for _, ins in ipairs(conf.instances) do + ins._healthy_dns_nodes = health_status.healthy_dns_nodes[ins.name] + end + + return health_status.status +end + + +local function get_health_status_ver(conf, checkers) + local parts = core.table.new(#conf.instances, 0) + for i, ins in ipairs(conf.instances) do + local checker = checkers[ins.name] + parts[i] = (ins._nodes_ver or 0) .. ":" .. (checker and checker.status_ver or "x") + end + + return table_concat(parts, "-") +end + + +local function fetch_health_status(conf, checkers, key, version) + if not checkers then + return nil + end + + local health_status = lrucache_health_status(key, version .. "#" .. + get_health_status_ver(conf, checkers), + create_health_status, conf, checkers) + return apply_health_status(conf, health_status) +end + + local function create_server_picker(conf, ups_tab, checkers) local picker = pickers[conf.balancer.algorithm] -- nil check if not picker then @@ -402,7 +504,12 @@ local function create_server_picker(conf, ups_tab, checkers) picker = pickers[conf.balancer.algorithm] end - local new_instances = fetch_health_instances(conf, checkers) + local new_instances + if conf.balancer.algorithm == "chash" then + new_instances = fetch_all_instances(conf) + else + new_instances = fetch_health_instances(conf, checkers) + end core.log.info("fetch health instances: ", core.json.delay_encode(new_instances)) if #new_instances._priority_index > 1 then @@ -448,8 +555,13 @@ local function pick_target(ctx, conf, ups_tab) end end - local version = plugin.conf_version(conf) .. "#" .. - get_checkers_status_ver(conf, checkers) + local health_status + local version = plugin.conf_version(conf) + if conf.balancer.algorithm == "chash" then + health_status = fetch_health_status(conf, checkers, ctx.matched_route.key, version) + else + version = version .. "#" .. get_checkers_status_ver(conf, checkers) + end local server_picker = ctx.server_picker if not server_picker then @@ -461,29 +573,48 @@ local function pick_target(ctx, conf, ups_tab) end ctx.server_picker = server_picker - local instance_name, err = server_picker.get(ctx) - if err then - return nil, nil, err + local ai_rate_limiting + local check_rate_limiting = conf.fallback_strategy == "instance_health_and_rate_limiting" or + fallback_strategy_has(conf.fallback_strategy, "rate_limiting") + if check_rate_limiting then + ai_rate_limiting = require("apisix.plugins.ai-rate-limiting") end - ctx.balancer_server = instance_name - if conf.fallback_strategy == "instance_health_and_rate_limiting" or -- for backwards compatible - fallback_strategy_has(conf.fallback_strategy, "rate_limiting") then - local ai_rate_limiting = require("apisix.plugins.ai-rate-limiting") - for _ = 1, #conf.instances do - if ai_rate_limiting.check_instance_status(nil, ctx, instance_name) then + + local instance_name, err + for _ = 1, #conf.instances do + instance_name, err = server_picker.get(ctx) + if err then + return nil, nil, err + end + + if not health_status or health_status[instance_name] then + if not check_rate_limiting or + ai_rate_limiting.check_instance_status(nil, ctx, instance_name) then break end core.log.warn("ai instance: ", instance_name, " is not available, try to pick another one") - server_picker.after_balance(ctx, true) - instance_name, err = server_picker.get(ctx) - if err then - return nil, nil, err - end - ctx.balancer_server = instance_name + + else + core.log.warn("ai instance: ", instance_name, + " is unhealthy, try to pick another one") + end + + ctx.balancer_server = instance_name + if not server_picker.after_balance then + return nil, nil, "failed to skip AI instance: after_balance is unavailable" end + + server_picker.after_balance(ctx, true) + instance_name = nil + end + + if not instance_name then + return nil, nil, "all servers tried" end + ctx.balancer_server = instance_name + local instance_conf = get_instance_conf(conf.instances, instance_name) local nodes = instance_conf._healthy_dns_nodes or instance_conf._dns_nodes use_node_for_request(instance_conf, pick_request_node(nodes)) diff --git a/t/node/chash-healthcheck-stable-ring.t b/t/node/chash-healthcheck-stable-ring.t new file mode 100644 index 000000000000..49eca4301d13 --- /dev/null +++ b/t/node/chash-healthcheck-stable-ring.t @@ -0,0 +1,204 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use t::APISIX 'no_plan'; + +repeat_each(1); +log_level('warn'); +no_root_location(); +no_shuffle(); + +add_block_preprocessor(sub { + my ($block) = @_; + + my $http_config = <<_EOC_; + server { + listen 127.0.0.1:16730; + + location /server_port { + content_by_lua_block { + ngx.print("16730") + } + } + + location /status { + return 500; + } + } + + server { + listen 127.0.0.1:16731; + + location /server_port { + content_by_lua_block { + ngx.print("16731") + } + } + + location /status { + return 200; + } + } + + server { + listen 127.0.0.1:16732; + + location /server_port { + content_by_lua_block { + ngx.print("16732") + } + } + + location /status { + return 200; + } + } +_EOC_ + $block->set_value("http_config", $http_config); +}); + +run_tests(); + +__DATA__ + +=== TEST 1: chash keeps healthy node mapping stable when health status changes +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local json = require("cjson.safe") + local http = require("resty.http") + + local unhealthy_port = "16730" + local nodes = { + ["127.0.0.1:" .. unhealthy_port] = 3, + ["127.0.0.1:16731"] = 6, + ["127.0.0.1:16732"] = 10, + } + + local function put_route(with_checks) + local upstream = { + type = "chash", + hash_on = "header", + key = "X-Sessionid", + nodes = nodes, + } + + if with_checks then + upstream.checks = { + active = { + http_path = "/status", + healthy = { + interval = 1, + http_statuses = {200}, + successes = 1, + }, + unhealthy = { + interval = 1, + http_statuses = {500}, + http_failures = 1, + tcp_failures = 1, + timeouts = 1, + }, + }, + } + end + + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + json.encode({ + uri = "/server_port", + upstream = upstream, + }) + ) + assert(code < 300, body) + + return upstream + end + + local function send(session_id) + local httpc = http.new() + local res, err = httpc:request_uri( + "http://127.0.0.1:" .. ngx.var.server_port .. "/server_port", + { + method = "GET", + keepalive = false, + headers = { + ["X-Sessionid"] = session_id, + }, + } + ) + assert(res, err) + assert(res.status == 200, res.status .. ": " .. res.body) + return res.body + end + + put_route(false) + + local baseline = {} + local baseline_unhealthy = 0 + local healthy_total = 0 + local total = 120 + for i = 1, total do + local key = "session-" .. i + local port = send(key) + baseline[key] = port + if port == unhealthy_port then + baseline_unhealthy = baseline_unhealthy + 1 + else + healthy_total = healthy_total + 1 + end + end + assert(baseline_unhealthy > 0, "baseline did not hit unhealthy node") + assert(healthy_total > 0, "baseline did not hit healthy nodes") + + put_route(true) + send("warmup") + ngx.sleep(3) + + local healthy_moved = 0 + local unhealthy_stayed = 0 + for i = 1, total do + local key = "session-" .. i + local before = baseline[key] + local after = send(key) + if before == unhealthy_port then + if after == unhealthy_port then + unhealthy_stayed = unhealthy_stayed + 1 + end + elseif after ~= before then + healthy_moved = healthy_moved + 1 + end + end + + assert(healthy_moved == 0, + "healthy-node keys moved after health change: " .. healthy_moved) + assert(unhealthy_stayed == 0, + "unhealthy-node keys still routed to unhealthy node: " .. unhealthy_stayed) + + ngx.say("baseline_unhealthy=", baseline_unhealthy, + ", healthy_total=", healthy_total, + ", healthy_moved=", healthy_moved) + } + } +--- request +GET /t +--- timeout: 30 +--- response_body eval +qr/baseline_unhealthy=\d+, healthy_total=\d+, healthy_moved=0/ +--- no_error_log +[error] diff --git a/t/plugin/ai-proxy-multi-chash-healthcheck-stable-ring.t b/t/plugin/ai-proxy-multi-chash-healthcheck-stable-ring.t new file mode 100644 index 000000000000..147fda63b47a --- /dev/null +++ b/t/plugin/ai-proxy-multi-chash-healthcheck-stable-ring.t @@ -0,0 +1,252 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use t::APISIX 'no_plan'; + +repeat_each(1); +log_level('warn'); +no_long_string(); +no_shuffle(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } + + my $user_yaml_config = <<_EOC_; +plugins: + - ai-proxy-multi +_EOC_ + $block->set_value("extra_yaml_config", $user_yaml_config); + + my $http_config = <<_EOC_; + server { + server_name gpu-a; + listen 127.0.0.1:16724; + default_type 'application/json'; + + location /v1/chat/completions { + content_by_lua_block { + ngx.say([[{"choices":[{"message":{"content":"gpu-a","role":"assistant"}}]}]]) + } + } + + location /status { + content_by_lua_block { + ngx.status = 500 + ngx.say("fail") + } + } + } + + server { + server_name gpu-b; + listen 127.0.0.1:16725; + default_type 'application/json'; + + location /v1/chat/completions { + content_by_lua_block { + ngx.say([[{"choices":[{"message":{"content":"gpu-b","role":"assistant"}}]}]]) + } + } + + location /status { + content_by_lua_block { + ngx.say("ok") + } + } + } + + server { + server_name gpu-c; + listen 127.0.0.1:16726; + default_type 'application/json'; + + location /v1/chat/completions { + content_by_lua_block { + ngx.say([[{"choices":[{"message":{"content":"gpu-c","role":"assistant"}}]}]]) + } + } + + location /status { + content_by_lua_block { + ngx.say("ok") + } + } + } +_EOC_ + $block->set_value("http_config", $http_config); +}); + +run_tests(); + +__DATA__ + +=== TEST 1: ai-proxy-multi chash keeps healthy instance mapping stable when health status changes +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local json = require("cjson.safe") + local http = require("resty.http") + + local checks = { + active = { + type = "http", + timeout = 1, + http_path = "/status", + healthy = { + interval = 1, + http_statuses = {200}, + successes = 1, + }, + unhealthy = { + interval = 1, + http_statuses = {500}, + http_failures = 1, + tcp_failures = 1, + timeouts = 1, + }, + }, + } + + local function instance(name, port, weight, with_checks) + local ins = { + name = name, + provider = "openai-compatible", + weight = weight, + auth = { + header = { + Authorization = "Bearer token", + }, + }, + options = { + model = name, + }, + override = { + endpoint = "http://127.0.0.1:" .. port .. "/v1/chat/completions", + }, + } + if with_checks then + ins.checks = checks + end + return ins + end + + local function put_route(with_checks) + local route = { + uri = "/ai", + plugins = { + ["ai-proxy-multi"] = { + balancer = { + algorithm = "chash", + hash_on = "header", + key = "X-Sessionid", + }, + instances = { + instance("gpu-a", 16724, 3, with_checks), + instance("gpu-b", 16725, 6, with_checks), + instance("gpu-c", 16726, 10, with_checks), + }, + ssl_verify = false, + }, + }, + } + + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + json.encode(route) + ) + assert(code < 300, body) + end + + local function send(session_id) + local httpc = http.new() + local res, err = httpc:request_uri( + "http://127.0.0.1:" .. ngx.var.server_port .. "/ai", + { + method = "POST", + body = json.encode({messages = {{role = "user", content = "hi"}}}), + keepalive = false, + headers = { + ["Content-Type"] = "application/json", + ["X-Sessionid"] = session_id, + }, + } + ) + assert(res, err) + assert(res.status == 200, res.status .. ": " .. res.body) + local body = assert(json.decode(res.body)) + return body.choices[1].message.content + end + + put_route(false) + + local baseline = {} + local baseline_a = 0 + local healthy_total = 0 + local total = 120 + for i = 1, total do + local key = "session-" .. i + local name = send(key) + baseline[key] = name + if name == "gpu-a" then + baseline_a = baseline_a + 1 + else + healthy_total = healthy_total + 1 + end + end + assert(baseline_a > 0, "baseline did not hit gpu-a") + assert(healthy_total > 0, "baseline did not hit healthy instances") + + put_route(true) + send("warmup") + ngx.sleep(3) + + local healthy_moved = 0 + local unhealthy_stayed = 0 + for i = 1, total do + local key = "session-" .. i + local before = baseline[key] + local after = send(key) + if before == "gpu-a" then + if after == "gpu-a" then + unhealthy_stayed = unhealthy_stayed + 1 + end + elseif after ~= before then + healthy_moved = healthy_moved + 1 + end + end + + assert(healthy_moved == 0, + "healthy-instance keys moved after health change: " .. healthy_moved) + assert(unhealthy_stayed == 0, + "unhealthy-instance keys still routed to gpu-a: " .. unhealthy_stayed) + + ngx.say("baseline_a=", baseline_a, + ", healthy_total=", healthy_total, + ", healthy_moved=", healthy_moved) + } + } +--- timeout: 30 +--- response_body eval +qr/baseline_a=\d+, healthy_total=\d+, healthy_moved=0/ +--- no_error_log +[error]