Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 74 additions & 12 deletions apisix/discovery/kubernetes/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,21 @@ end


local function post_list(handle)
if not handle.existing_keys or not handle.current_keys_hash then
return
end
for _, key in ipairs(handle.existing_keys) do
if not handle.current_keys_hash[key] then
core.log.info("kubernetes discovery module find dirty data in shared dict, key:", key)
handle.endpoint_dict:delete(key)
if handle.existing_keys and handle.current_keys_hash then
for _, key in ipairs(handle.existing_keys) do
if not handle.current_keys_hash[key] then
core.log.info("kubernetes discovery module found dirty data in shared dict, key: ",
key)
handle.endpoint_dict:delete(key)
end
end
handle.existing_keys = nil
handle.current_keys_hash = nil
end
local _, err = handle.endpoint_dict:safe_set("discovery_ready", true)
if err then
core.log.error("set discovery_ready flag into discovery DICT failed, ", err)
end
handle.existing_keys = nil
handle.current_keys_hash = nil
end


Expand Down Expand Up @@ -436,18 +440,24 @@ local function start_fetch(handle)
ngx.timer.at(0, timer_runner)
end

local function get_endpoint_dict(id)

local function get_endpoint_dict_name(id)
local shm = "kubernetes"

if id and #id > 0 then
if id and type(id) == "string" and #id > 0 then
shm = shm .. "-" .. id
end

if not is_http then
shm = shm .. "-stream"
end
return shm
end


return ngx.shared[shm]
local function get_endpoint_dict(id)
local dict_name = get_endpoint_dict_name(id)
return ngx.shared[dict_name]
end


Expand Down Expand Up @@ -684,6 +694,7 @@ local function dump_endpoints_from_dict(endpoint_dict)
return endpoints
end


function _M.dump_data()
local discovery_conf = local_conf.discovery.kubernetes
local eps = {}
Expand Down Expand Up @@ -715,4 +726,55 @@ function _M.dump_data()
end


local function check_ready(id)
local endpoint_dict = get_endpoint_dict(id)
if not endpoint_dict then
core.log.error("failed to get lua_shared_dict:", get_endpoint_dict_name(id),
", please check your APISIX version")
return false, "failed to get lua_shared_dict: " .. get_endpoint_dict_name(id)
.. ", please check your APISIX version"
end
-- check flag
local ready = endpoint_dict:get("discovery_ready")
if not ready then
core.log.warn("kubernetes discovery not ready")
return false, "kubernetes discovery not ready"
end
return true
end


local function single_mode_check_discovery_ready()
local _, err = check_ready()
if err then
return false, err
end
return true
end


local function multiple_mode_check_discovery_ready(confs)
for _, conf in ipairs(confs) do
local _, err = check_ready(conf.id)
if err then
return false, err
end
end
return true
end


function _M.check_discovery_ready()
local discovery_conf = local_conf.discovery and local_conf.discovery.kubernetes
if not discovery_conf then
return true
end
if #discovery_conf == 0 then
return single_mode_check_discovery_ready()
else
return multiple_mode_check_discovery_ready(discovery_conf)
end
end


return _M
105 changes: 64 additions & 41 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ local debug = require("apisix.debug")
local pubsub_kafka = require("apisix.pubsub.kafka")
local resource = require("apisix.resource")
local trusted_addresses_util = require("apisix.utils.trusted-addresses")
local discovery = require("apisix.discovery.init").discovery
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
Expand Down Expand Up @@ -121,7 +122,6 @@ function _M.http_init_worker()

core.lrucache.init_worker()

local discovery = require("apisix.discovery.init").discovery
if discovery and discovery.init_worker then
discovery.init_worker()
end
Expand Down Expand Up @@ -976,54 +976,78 @@ function _M.status()
core.response.exit(200, core.json.encode({ status = "ok" }))
end

function _M.status_ready()
local local_conf = core.config.local_conf()
local role = core.table.try_read_attr(local_conf, "deployment", "role")
local provider = core.table.try_read_attr(local_conf, "deployment", "role_" ..
role, "config_provider")
if provider == "yaml" or provider == "etcd" then
local status_shdict = ngx.shared["status-report"]
local ids = status_shdict:get_keys()
local error
local worker_count = ngx.worker.count()
if #ids ~= worker_count then
core.log.warn("worker count: ", worker_count, " but status report count: ", #ids)
error = "worker count: " .. ngx.worker.count() ..
" but status report count: " .. #ids
end
if error then
core.response.exit(503, core.json.encode({
status = "error",
error = error
}))
return
end
for _, id in ipairs(ids) do
local ready = status_shdict:get(id)

local function discovery_ready_check()
local discovery_type = local_conf.discovery
if not discovery_type then
return true
end
for discovery_name, _ in pairs(discovery_type) do
local dis_module = discovery[discovery_name]
if dis_module.check_discovery_ready then
local ready, message = dis_module.check_discovery_ready()
if not ready then
core.log.warn("worker id: ", id, " has not received configuration")
error = "worker id: " .. id ..
" has not received configuration"
break
return false, message
end
end
end
return true
end

if error then
core.response.exit(503, core.json.encode({
status = "error",
error = error
}))
return
local function config_ready_check()
local role = core.table.try_read_attr(local_conf, "deployment", "role")
local provider = core.table.try_read_attr(local_conf, "deployment",
"role_" .. role, "config_provider")
if provider ~= "yaml" and provider ~= "etcd" then
return false, "unknown config provider: " .. tostring(provider)
end

local status_shdict = ngx.shared["status-report"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may get nil, need to check it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if not status_shdict then
core.log.error("failed to get ngx.shared dict status-report")
return false, "failed to get ngx.shared dict status-report"
end
local ids = status_shdict:get_keys()

local worker_count = ngx.worker.count()
if #ids ~= worker_count then
local error = "worker count: " .. worker_count .. " but status report count: " .. #ids
core.log.error(error)
return false, error
end
for _, id in ipairs(ids) do
local ready = status_shdict:get(id)
if not ready then
local error = "worker id: " .. id .. " has not received configuration"
core.log.error(error)
return false, error
end
end

return true
end

core.response.exit(200, core.json.encode({ status = "ok" }))
function _M.status_ready()
local ready, message = config_ready_check()
if not ready then
core.response.exit(503, core.json.encode({
status = "error",
error = message
}))
return
end

core.response.exit(503, core.json.encode({
status = "error",
message = "unknown config provider: " .. tostring(provider)
}), { ["Content-Type"] = "application/json" })
ready, message = discovery_ready_check()
if not ready then
core.response.exit(503, core.json.encode({
status = "error",
error = message
}))
return
end

core.response.exit(200, core.json.encode({ status = "ok" }))
return
end


Expand Down Expand Up @@ -1182,7 +1206,6 @@ function _M.stream_init_worker()
-- for admin api of standalone mode, we need to startup background timer and patch schema etc.
require("apisix.admin.init").init_worker()

local discovery = require("apisix.discovery.init").discovery
if discovery and discovery.init_worker then
discovery.init_worker()
end
Expand Down
Loading
Loading