in apisix/discovery/consul/init.lua [362:573]
function _M.connect(premature, consul_server, retry_delay)
if premature then
return
end
local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, consul_server)
if not catalog_thread then
local random_delay = math_random(default_random_range)
log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
", retry connecting consul after ", random_delay, " seconds")
core_sleep(random_delay)
check_keepalive(consul_server, retry_delay)
return
end
local health_thread, err = thread_spawn(watch_health, consul_server)
if not health_thread then
thread_kill(catalog_thread)
local random_delay = math_random(default_random_range)
log.error("failed to spawn thread watch health: ", err, ", retry connecting consul after ",
random_delay, " seconds")
core_sleep(random_delay)
check_keepalive(consul_server, retry_delay)
return
end
local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, health_thread)
thread_kill(catalog_thread)
thread_kill(health_thread)
if not thread_wait_ok then
local random_delay = math_random(default_random_range)
log.error("failed to wait thread: ", watch_type, ", retry connecting consul after ",
random_delay, " seconds")
core_sleep(random_delay)
check_keepalive(consul_server, retry_delay)
return
end
if not watch_result_is_valid(tonumber(watch_type),
tonumber(index), consul_server.catalog_index, consul_server.health_index) then
retry_delay = get_retry_delay(retry_delay)
log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds")
core_sleep(retry_delay)
check_keepalive(consul_server, retry_delay)
return
end
local consul_client = resty_consul:new({
host = consul_server.host,
port = consul_server.port,
connect_timeout = consul_server.connect_timeout,
read_timeout = consul_server.read_timeout,
default_args = {
token = consul_server.token
}
})
local catalog_success, catalog_res, catalog_err = pcall(function()
return consul_client:get(consul_server.consul_watch_catalog_url)
end)
if not catalog_success then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_watch_catalog_url,
", got catalog result: ", json_delay_encode(catalog_res))
check_keepalive(consul_server, retry_delay)
return
end
local catalog_error_info = (catalog_err ~= nil and catalog_err)
or ((catalog_res ~= nil and catalog_res.status ~= 200)
and catalog_res.status)
if catalog_error_info then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_watch_catalog_url,
", got catalog result: ", json_delay_encode(catalog_res),
", with error: ", catalog_error_info)
retry_delay = get_retry_delay(retry_delay)
log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds")
core_sleep(retry_delay)
check_keepalive(consul_server, retry_delay)
return
end
local success, health_res, health_err = pcall(function()
return consul_client:get(consul_server.consul_watch_health_url)
end)
if not success then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_watch_health_url,
", got health result: ", json_delay_encode(health_res))
check_keepalive(consul_server, retry_delay)
return
end
local health_error_info = (health_err ~= nil and health_err)
or ((health_res ~= nil and health_res.status ~= 200)
and health_res.status)
if health_error_info then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_watch_health_url,
", got health result: ", json_delay_encode(health_res),
", with error: ", health_error_info)
retry_delay = get_retry_delay(retry_delay)
log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds")
core_sleep(retry_delay)
check_keepalive(consul_server, retry_delay)
return
end
log.info("connect consul: ", consul_server.consul_server_url,
", catalog_result status: ", catalog_res.status,
", catalog_result.headers.index: ", catalog_res.headers['X-Consul-Index'],
", consul_server.index: ", consul_server.index,
", consul_server: ", json_delay_encode(consul_server))
if (consul_server.catalog_index ~= tonumber(catalog_res.headers['X-Consul-Index']))
or (consul_server.health_index ~= tonumber(health_res.headers['X-Consul-Index'])) then
local up_services = core.table.new(0, #catalog_res.body)
for service_name, _ in pairs(catalog_res.body) do
if skip_service_map[service_name] then
goto CONTINUE
end
local svc_url = consul_server.consul_sub_url .. "/" .. service_name
local svc_success, result, get_err = pcall(function()
return consul_client:get(svc_url, {passing = true})
end)
local error_info = (get_err ~= nil and get_err) or
((result ~= nil and result.status ~= 200) and result.status)
if not svc_success or error_info then
log.error("connect consul: ", consul_server.consul_server_url,
", by service url: ", svc_url, ", with error: ", error_info)
goto CONTINUE
end
if is_not_empty(result.body) then
local nodes = up_services[service_name]
local nodes_uniq = {}
for _, node in ipairs(result.body) do
if not node.Service then
goto CONTINUE
end
local svc_address, svc_port = node.Service.Address, node.Service.Port
if not nodes then
nodes = core.table.new(1, 0)
up_services[service_name] = nodes
end
local service_id = svc_address .. ":" .. svc_port
if not nodes_uniq[service_id] then
core.table.insert(nodes, {
host = svc_address,
port = tonumber(svc_port),
weight = default_weight,
})
nodes_uniq[service_id] = true
end
end
if nodes then
if sort_type == "port_sort" then
core.table.sort(nodes, port_sort_nodes_cmp)
elseif sort_type == "host_sort" then
core.table.sort(nodes, host_sort_nodes_cmp)
elseif sort_type == "combine_sort" then
core.table.sort(nodes, combine_sort_nodes_cmp)
end
end
up_services[service_name] = nodes
end
:: CONTINUE ::
end
update_all_services(consul_server.consul_server_url, up_services)
local post_ok, post_err = events:post(events_list._source,
events_list.updating, all_services)
if not post_ok then
log.error("post_event failure with ", events_list._source,
", update all services error: ", post_err)
end
if dump_params then
ngx_timer_at(0, write_dump_services)
end
update_index(consul_server,
catalog_res.headers['X-Consul-Index'],
health_res.headers['X-Consul-Index'])
end
check_keepalive(consul_server, retry_delay)
end