function _M.connect()

in apisix/discovery/consul/init.lua [362:573]


function _M.connect(premature, consul_server, retry_delay)
    if premature then
        return
    end

    local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, consul_server)
    if not catalog_thread then
        local random_delay = math_random(default_random_range)
        log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
            ", retry connecting consul after ", random_delay, " seconds")
        core_sleep(random_delay)

        check_keepalive(consul_server, retry_delay)
        return
    end

    local health_thread, err = thread_spawn(watch_health, consul_server)
    if not health_thread then
        thread_kill(catalog_thread)
        local random_delay = math_random(default_random_range)
        log.error("failed to spawn thread watch health: ", err, ", retry connecting consul after ",
            random_delay, " seconds")
        core_sleep(random_delay)

        check_keepalive(consul_server, retry_delay)
        return
    end

    local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, health_thread)
    thread_kill(catalog_thread)
    thread_kill(health_thread)
    if not thread_wait_ok then
        local random_delay = math_random(default_random_range)
        log.error("failed to wait thread: ", watch_type, ", retry connecting consul after ",
                random_delay, " seconds")
        core_sleep(random_delay)

        check_keepalive(consul_server, retry_delay)
        return
    end

    
    if not watch_result_is_valid(tonumber(watch_type),
            tonumber(index), consul_server.catalog_index, consul_server.health_index) then
        retry_delay = get_retry_delay(retry_delay)
        log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds")
        core_sleep(retry_delay)

        check_keepalive(consul_server, retry_delay)
        return
    end

    local consul_client = resty_consul:new({
        host = consul_server.host,
        port = consul_server.port,
        connect_timeout = consul_server.connect_timeout,
        read_timeout = consul_server.read_timeout,
        default_args = {
            token = consul_server.token
        }
    })
    local catalog_success, catalog_res, catalog_err = pcall(function()
        return consul_client:get(consul_server.consul_watch_catalog_url)
    end)
    if not catalog_success then
        log.error("connect consul: ", consul_server.consul_server_url,
            " by sub url: ", consul_server.consul_watch_catalog_url,
            ", got catalog result: ", json_delay_encode(catalog_res))
        check_keepalive(consul_server, retry_delay)
        return
    end
    local catalog_error_info = (catalog_err ~= nil and catalog_err)
            or ((catalog_res ~= nil and catalog_res.status ~= 200)
            and catalog_res.status)
    if catalog_error_info then
        log.error("connect consul: ", consul_server.consul_server_url,
            " by sub url: ", consul_server.consul_watch_catalog_url,
            ", got catalog result: ", json_delay_encode(catalog_res),
            ", with error: ", catalog_error_info)

        retry_delay = get_retry_delay(retry_delay)
        log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds")
        core_sleep(retry_delay)

        check_keepalive(consul_server, retry_delay)
        return
    end

    
    local success, health_res, health_err = pcall(function()
        return consul_client:get(consul_server.consul_watch_health_url)
    end)
    if not success then
        log.error("connect consul: ", consul_server.consul_server_url,
            " by sub url: ", consul_server.consul_watch_health_url,
            ", got health result: ", json_delay_encode(health_res))
        check_keepalive(consul_server, retry_delay)
        return
    end
    local health_error_info = (health_err ~= nil and health_err)
            or ((health_res ~= nil and health_res.status ~= 200)
            and health_res.status)
    if health_error_info then
        log.error("connect consul: ", consul_server.consul_server_url,
            " by sub url: ", consul_server.consul_watch_health_url,
            ", got health result: ", json_delay_encode(health_res),
            ", with error: ", health_error_info)

        retry_delay = get_retry_delay(retry_delay)
        log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds")
        core_sleep(retry_delay)

        check_keepalive(consul_server, retry_delay)
        return
    end

    log.info("connect consul: ", consul_server.consul_server_url,
        ", catalog_result status: ", catalog_res.status,
        ", catalog_result.headers.index: ", catalog_res.headers['X-Consul-Index'],
        ", consul_server.index: ", consul_server.index,
        ", consul_server: ", json_delay_encode(consul_server))

    
    if (consul_server.catalog_index ~= tonumber(catalog_res.headers['X-Consul-Index']))
            or (consul_server.health_index ~= tonumber(health_res.headers['X-Consul-Index'])) then
        local up_services = core.table.new(0, #catalog_res.body)
        for service_name, _ in pairs(catalog_res.body) do
            
            if skip_service_map[service_name] then
                goto CONTINUE
            end

            
            local svc_url = consul_server.consul_sub_url .. "/" .. service_name
            local svc_success, result, get_err = pcall(function()
                return consul_client:get(svc_url, {passing = true})
            end)
            local error_info = (get_err ~= nil and get_err) or
                    ((result ~= nil and result.status ~= 200) and result.status)
            if not svc_success or error_info then
                log.error("connect consul: ", consul_server.consul_server_url,
                    ", by service url: ", svc_url, ", with error: ", error_info)
                goto CONTINUE
            end

            
            
            if is_not_empty(result.body) then
                
                local nodes = up_services[service_name]
                local nodes_uniq = {}
                for _, node in ipairs(result.body) do
                    if not node.Service then
                        goto CONTINUE
                    end

                    local svc_address, svc_port = node.Service.Address, node.Service.Port
                    
                    if not nodes then
                        nodes = core.table.new(1, 0)
                        up_services[service_name] = nodes
                    end
                    
                    local service_id = svc_address .. ":" .. svc_port
                    if not nodes_uniq[service_id] then
                        
                        core.table.insert(nodes, {
                            host = svc_address,
                            port = tonumber(svc_port),
                            weight = default_weight,
                        })
                        nodes_uniq[service_id] = true
                    end
                end
                if nodes then
                    if sort_type == "port_sort" then
                        core.table.sort(nodes, port_sort_nodes_cmp)

                    elseif sort_type == "host_sort" then
                        core.table.sort(nodes, host_sort_nodes_cmp)

                    elseif sort_type == "combine_sort" then
                        core.table.sort(nodes, combine_sort_nodes_cmp)

                    end
                end
                up_services[service_name] = nodes
            end
            :: CONTINUE ::
        end

        update_all_services(consul_server.consul_server_url, up_services)

        
        local post_ok, post_err = events:post(events_list._source,
                events_list.updating, all_services)
        if not post_ok then
            log.error("post_event failure with ", events_list._source,
                ", update all services error: ", post_err)
        end

        if dump_params then
            ngx_timer_at(0, write_dump_services)
        end

        update_index(consul_server,
                catalog_res.headers['X-Consul-Index'],
                health_res.headers['X-Consul-Index'])
    end

    check_keepalive(consul_server, retry_delay)
end