lib/shenyu/register/nacos.lua (182 lines of code) (raw):

-- -- Licensed to the Apache Software Foundation (ASF) under one or more -- contributor license agreements. See the NOTICE file distributed with -- this work for additional information regarding copyright ownership. -- The ASF licenses this file to You under the Apache License, Version 2.0 -- (the "License"); you may not use this file except in compliance with -- the License. You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- local _M = {} local http = require("resty.http") local json = require("cjson.safe") local ngx_balancer = require("ngx.balancer") local balancer = require("shenyu.register.balancer") local ngx = ngx local ngx_timer_at = ngx.timer.at local ngx_worker_exiting = ngx.worker.exiting local log = ngx.log local ERR = ngx.ERR local INFO = ngx.INFO _M.access_token = nil local function login() local httpc = http.new() local res, err = httpc:request_uri(_M.nacos_base_url, { method = "POST", path = "/nacos/v1/auth/login", query = "username=" .. _M.username .. "&password=" .. _M.password, }) if not res then return nil, err end if res.status >= 400 then return nil, res.body end log(INFO, "login nacos in username: '" .. _M.username .. "' successfully.") return json.decode(res.body).accessToken end local function get_server_list() local httpc = http.new() local res, err = httpc:request_uri(_M.nacos_base_url, { method = "GET", path = "/nacos/v1/ns/instance/list", query = "serviceName=" .. _M.service_name .. "&groupName=" .. _M.group_name .. "&namespaceId=" .. _M.namespace .. "&clusters=" .. _M.clusters .. "&healthOnly=true" .. "accessToken=" .. _M.access_token }) if not res then log(ERR, "failed to get server list from nacos. ", err) return end if res.status == 200 then local server_list = {} local list_inst_resp = json.decode(res.body) local hosts = list_inst_resp.hosts if not hosts then return {}, 0, 0 end for _, inst in pairs(hosts) do local key = inst.ip .. ":" .. inst.port server_list[key] = inst.weight end return server_list, list_inst_resp.lastRefTime, #hosts end log(ERR, res.body) return end local function subscribe(premature, initialized) if premature or ngx_worker_exiting() then return end if not initialized then local token, err = login() if not token then log(ERR, err) goto continue end _M.access_token = token local server_list, revision, servers_length = get_server_list() if not server_list or servers_length == 0 then goto continue end _M.balancer:init(server_list) _M.revision = revision _M.servers_length = servers_length local server_list_in_json = json.encode(server_list) log(INFO, "initialize upstream: " .. server_list_in_json .. " , revision: " .. revision) _M.storage:set("server_list", server_list_in_json) _M.storage:set("revision", revision) initialized = true else local server_list, revision, servers_length = get_server_list() if not server_list or servers_length == 0 then goto continue end local updated = true if _M.servers_length == servers_length then local services = _M.server_list for srv, weight in pairs(server_list) do if services[srv] ~= weight then break end end updated = false end if not updated then goto continue end _M.balancer:reinit(server_list) _M.revision = revision _M.server_list = server_list local server_list_in_json = json.encode(server_list) log(INFO, "update upstream: " .. server_list_in_json .. " , revision: " .. revision) _M.storage:set("server_list", server_list_in_json) _M.storage:set("revision", revision) _M.servers_length = servers_length end :: continue :: local ok, err = ngx_timer_at(2, subscribe, initialized) if not ok then log(ERR, "failed to subscribe: ", err) end return end local function sync(premature) if premature or ngx_worker_exiting() then return end local storage = _M.storage local ver = storage:get("revision") if ver > _M.revision then local server_list = storage:get("server_list") local servers = json.decode(server_list) if _M.revision < 1 then _M.balancer:init(servers) log(INFO, "initialize upstream in workers, upstream: " .. server_list) else _M.balancer:reinit(servers) log(INFO, "update upstream in workers, upstream: " .. server_list) end _M.revision = ver end local ok, err = ngx_timer_at(1, sync) if not ok then log(ERR, "failed to start sync: ", err) end end -- conf = { -- balance_type = "chash", -- nacos_base_url = "http://127.0.0.1:8848", -- username = "nacos", -- password = "nacos", -- namespace = "", -- service_name = "", -- group_name = "", -- clusters = "", -- } function _M.init(conf) _M.storage = conf.shenyu_storage _M.balancer = balancer.new(conf.balancer_type) _M.revision = 0 if ngx.worker.id() == 0 then _M.nacos_base_url = conf.nacos_base_url _M.username = conf.username _M.password = conf.password _M.namespace = conf.namespace _M.group_name = conf.group_name _M.service_name = conf.service_name _M.clusters = conf.clusters if not conf.clusters then _M.clusters = "" end if not conf.namespace then _M.namespace = "" end if not conf.group_name then _M.group_name = "DEFAULT_GROUP" end if not conf.service_name then _M.service_name = "shenyu-instances" end _M.server_list = {} _M.storage:set("revision", 0) -- subscribed by polling, privileged local ok, err = ngx_timer_at(0, subscribe) if not ok then log(ERR, "failed to start watch: " .. err) end return end -- synchronize server_list from privileged processor to workers local ok, err = ngx_timer_at(2, sync) if not ok then log(ERR, "failed to start sync ", err) end end function _M.pick_and_set_peer(key) local server = _M.balancer:find(key) ngx_balancer.set_current_peer(server) end return _M