Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ local pickers = {}
local lrucache_server_picker = core.lrucache.new({
ttl = 300, count = 256
})
local lrucache_health_status = core.lrucache.new({
ttl = 300, count = 256
})
local lrucache_addr = core.lrucache.new({
ttl = 300, count = 1024 * 4
})
Expand Down Expand Up @@ -95,6 +98,63 @@ local function fetch_health_nodes(upstream, checker)
end


local function fetch_all_nodes(upstream)
local nodes = upstream.nodes
local new_nodes = core.table.new(0, #nodes)
for _, node in ipairs(nodes) do
new_nodes = transform_node(new_nodes, node)
end
return new_nodes
end


local function create_health_status(upstream, checker)
local nodes = upstream.nodes
local host = upstream.checks and upstream.checks.active and upstream.checks.active.host
local port = upstream.checks and upstream.checks.active and upstream.checks.active.port
local health_status = core.table.new(0, #nodes)
local has_healthy_node = false

for _, node in ipairs(nodes) do
local ok, err = healthcheck_manager.fetch_node_status(checker,
node.host, port or node.port, host)
local addr = node.host .. ":" .. node.port
if ok then
health_status[addr] = true
has_healthy_node = true
else
health_status[addr] = false
if err then
core.log.warn("failed to get health check target status, addr: ",
node.host, ":", port or node.port, ", host: ", host, ", err: ", err)
end
end
end

if not has_healthy_node then
core.log.warn("all upstream nodes is unhealthy, use default")
return {all_unhealthy = true}
end

return {status = health_status}
end


local function fetch_health_status(upstream, checker, key, version)
if not checker then
return nil
end

local health_status = lrucache_health_status(key, version .. "#" .. checker.status_ver,
create_health_status, upstream, checker)
if not health_status or health_status.all_unhealthy then
return nil
end

return health_status.status
end


local function create_server_picker(upstream, checker)
local picker = pickers[upstream.type]
if not picker then
Expand All @@ -112,7 +172,12 @@ local function create_server_picker(upstream, checker)
end
end

local up_nodes = fetch_health_nodes(upstream, checker)
local up_nodes
if upstream.type == "chash" then
up_nodes = fetch_all_nodes(upstream)
else
up_nodes = fetch_health_nodes(upstream, checker)
end

if #up_nodes._priority_index > 1 then
core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes))
Expand Down Expand Up @@ -229,7 +294,12 @@ local function pick_server(route, ctx)
end
end

if checker then
local health_status
if checker and up_conf.type == "chash" then
health_status = fetch_health_status(up_conf, checker, key, version)
end
Comment thread
nic-6443 marked this conversation as resolved.

if checker and up_conf.type ~= "chash" then
version = version .. "#" .. checker.status_ver
end

Expand All @@ -243,10 +313,29 @@ local function pick_server(route, ctx)
return nil, "failed to fetch server picker"
end

local server, err = server_picker.get(ctx)
local server, err
for _ = 1, nodes_count do
server, err = server_picker.get(ctx)
if not server then
err = err or "no valid upstream node"
return nil, "failed to find valid upstream server, " .. err
end

if not health_status or health_status[server] then
break
end

ctx.balancer_server = server
if not server_picker.after_balance then
return nil, "failed to skip unhealthy upstream server: after_balance is unavailable"
end

server_picker.after_balance(ctx, true)
server = nil
end

if not server then
err = err or "no valid upstream node"
return nil, "failed to find valid upstream server, " .. err
return nil, "failed to find valid upstream server, all upstream servers tried"
end
ctx.balancer_server = server

Expand Down
8 changes: 7 additions & 1 deletion apisix/healthcheck_manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,13 @@ function _M.fetch_node_status(checker, ip, port, hostname)
return true
end

return checker:get_target_status(ip, port, hostname)
local ok, err = checker:get_target_status(ip, port, hostname)
if err == "target not found" then
-- Checker targets are created asynchronously, so this means unknown.
return true
end

return ok, err
end


Expand Down
167 changes: 149 additions & 18 deletions apisix/plugins/ai-proxy-multi.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ local pickers = {}
local lrucache_server_picker = core.lrucache.new({
ttl = 300, count = 256
})
local lrucache_health_status = core.lrucache.new({
ttl = 300, count = 256
})

local plugin_name = "ai-proxy-multi"
local _M = {
Expand Down Expand Up @@ -395,14 +398,118 @@ local function fetch_health_instances(conf, checkers)
end


local function fetch_all_instances(conf)
local instances = conf.instances
local new_instances = core.table.new(0, #instances)
for _, ins in ipairs(instances) do
transform_instances(new_instances, ins)
end

return new_instances
end


local function create_health_status(conf, checkers)
local instances = conf.instances
local health_status = core.table.new(0, #instances)
local healthy_dns_nodes = core.table.new(0, #instances)
local has_healthy_instance = false

for _, ins in ipairs(instances) do
local checker = checkers[ins.name]
if checker then
local host = ins.checks and ins.checks.active and ins.checks.active.host
local port = ins.checks and ins.checks.active and ins.checks.active.port
local healthy_nodes = {}

for _, node in ipairs(ins._dns_nodes or {}) do
local ok, err = checker:get_target_status(node.host, port or node.port, host)
if ok then
healthy_nodes[#healthy_nodes + 1] = node
elseif err then
core.log.warn("failed to get health check target status, addr: ",
node.host, ":", port or node.port, ", host: ", host, ", err: ", err)
end
end

if #healthy_nodes > 0 then
healthy_dns_nodes[ins.name] = healthy_nodes
health_status[ins.name] = true
has_healthy_instance = true
else
health_status[ins.name] = false
end
else
health_status[ins.name] = true
has_healthy_instance = true
end
end

if not has_healthy_instance then
core.log.warn("all upstream nodes is unhealthy, use default")
return {all_unhealthy = true}
end

return {
status = health_status,
healthy_dns_nodes = healthy_dns_nodes,
}
end


local function apply_health_status(conf, health_status)
if not health_status or health_status.all_unhealthy then
for _, ins in ipairs(conf.instances) do
ins._healthy_dns_nodes = nil
end

return nil
end

for _, ins in ipairs(conf.instances) do
ins._healthy_dns_nodes = health_status.healthy_dns_nodes[ins.name]
end

return health_status.status
end


local function get_health_status_ver(conf, checkers)
local parts = core.table.new(#conf.instances, 0)
for i, ins in ipairs(conf.instances) do
local checker = checkers[ins.name]
parts[i] = (ins._nodes_ver or 0) .. ":" .. (checker and checker.status_ver or "x")
end

return table_concat(parts, "-")
end


local function fetch_health_status(conf, checkers, key, version)
if not checkers then
return nil
end

local health_status = lrucache_health_status(key, version .. "#" ..
get_health_status_ver(conf, checkers),
create_health_status, conf, checkers)
return apply_health_status(conf, health_status)
end


local function create_server_picker(conf, ups_tab, checkers)
local picker = pickers[conf.balancer.algorithm] -- nil check
if not picker then
pickers[conf.balancer.algorithm] = require("apisix.balancer." .. conf.balancer.algorithm)
picker = pickers[conf.balancer.algorithm]
end

local new_instances = fetch_health_instances(conf, checkers)
local new_instances
if conf.balancer.algorithm == "chash" then
new_instances = fetch_all_instances(conf)
else
new_instances = fetch_health_instances(conf, checkers)
end
core.log.info("fetch health instances: ", core.json.delay_encode(new_instances))

if #new_instances._priority_index > 1 then
Expand Down Expand Up @@ -448,8 +555,13 @@ local function pick_target(ctx, conf, ups_tab)
end
end

local version = plugin.conf_version(conf) .. "#" ..
get_checkers_status_ver(conf, checkers)
local health_status
local version = plugin.conf_version(conf)
if conf.balancer.algorithm == "chash" then
health_status = fetch_health_status(conf, checkers, ctx.matched_route.key, version)
else
version = version .. "#" .. get_checkers_status_ver(conf, checkers)
end
Comment thread
nic-6443 marked this conversation as resolved.

local server_picker = ctx.server_picker
if not server_picker then
Expand All @@ -461,29 +573,48 @@ local function pick_target(ctx, conf, ups_tab)
end
ctx.server_picker = server_picker

local instance_name, err = server_picker.get(ctx)
if err then
return nil, nil, err
local ai_rate_limiting
local check_rate_limiting = conf.fallback_strategy == "instance_health_and_rate_limiting" or
fallback_strategy_has(conf.fallback_strategy, "rate_limiting")
if check_rate_limiting then
ai_rate_limiting = require("apisix.plugins.ai-rate-limiting")
end
ctx.balancer_server = instance_name
if conf.fallback_strategy == "instance_health_and_rate_limiting" or -- for backwards compatible
fallback_strategy_has(conf.fallback_strategy, "rate_limiting") then
local ai_rate_limiting = require("apisix.plugins.ai-rate-limiting")
for _ = 1, #conf.instances do
if ai_rate_limiting.check_instance_status(nil, ctx, instance_name) then

local instance_name, err
for _ = 1, #conf.instances do
instance_name, err = server_picker.get(ctx)
if err then
return nil, nil, err
end

if not health_status or health_status[instance_name] then
if not check_rate_limiting or
ai_rate_limiting.check_instance_status(nil, ctx, instance_name) then
break
end
core.log.warn("ai instance: ", instance_name,
" is not available, try to pick another one")
server_picker.after_balance(ctx, true)
instance_name, err = server_picker.get(ctx)
if err then
return nil, nil, err
end
ctx.balancer_server = instance_name

else
core.log.warn("ai instance: ", instance_name,
" is unhealthy, try to pick another one")
end

ctx.balancer_server = instance_name
if not server_picker.after_balance then
return nil, nil, "failed to skip AI instance: after_balance is unavailable"
end

server_picker.after_balance(ctx, true)
instance_name = nil
end

if not instance_name then
return nil, nil, "all servers tried"
end

ctx.balancer_server = instance_name

local instance_conf = get_instance_conf(conf.instances, instance_name)
local nodes = instance_conf._healthy_dns_nodes or instance_conf._dns_nodes
use_node_for_request(instance_conf, pick_request_node(nodes))
Expand Down
Loading
Loading