in t/kubernetes/discovery/kubernetes3.t [88:232]
add_block_preprocessor(sub {
my ($block) = @_;
my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
routes: []
#END
_EOC_
$block->set_value("apisix_yaml", $apisix_yaml);
my $main_config = $block->main_config // <<_EOC_;
env KUBERNETES_SERVICE_HOST=127.0.0.1;
env KUBERNETES_SERVICE_PORT=6443;
env KUBERNETES_CLIENT_TOKEN=$::token_value;
env KUBERNETES_CLIENT_TOKEN_FILE=$::token_file;
_EOC_
$block->set_value("main_config", $main_config);
my $config = $block->config // <<_EOC_;
location /queries {
content_by_lua_block {
local core = require("apisix.core")
local d = require("apisix.discovery.kubernetes")
ngx.sleep(1)
ngx.req.read_body()
local request_body = ngx.req.get_body_data()
local queries = core.json.decode(request_body)
local response_body = "{"
for _,query in ipairs(queries) do
local nodes = d.nodes(query)
if nodes==nil or #nodes==0 then
response_body=response_body.." "..0
else
response_body=response_body.." "..#nodes
end
end
ngx.say(response_body.." }")
}
}
location /operators {
content_by_lua_block {
local http = require("resty.http")
local core = require("apisix.core")
local ipairs = ipairs
ngx.req.read_body()
local request_body = ngx.req.get_body_data()
local operators = core.json.decode(request_body)
core.log.info("get body ", request_body)
core.log.info("get operators ", #operators)
for _, op in ipairs(operators) do
local method, path, body
local headers = {
["Host"] = "127.0.0.1:6445"
}
if op.op == "replace_endpointslices" then
method = "PATCH"
path = "/apis/discovery.k8s.io/namespaces/" .. op.namespace .. "/endpointslices/" .. op.name
if #op.endpoints == 0 then
body = '[{"path":"/endpoints","op":"replace","value":[]}]'
else
local t = { { op = "replace", path = "/endpoints", value = op.endpoints } }
body = core.json.encode(t, true)
end
headers["Content-Type"] = "application/json-patch+json"
end
if op.op == "replace_labels" then
method = "PATCH"
path = "/apis/discovery.k8s.io/namespaces/" .. op.namespace .. "/endpointslices/" .. op.name
local t = { { op = "replace", path = "/metadata/labels", value = op.labels } }
body = core.json.encode(t, true)
headers["Content-Type"] = "application/json-patch+json"
end
local httpc = http.new()
core.log.info("begin to connect ", "127.0.0.1:6445")
local ok, message = httpc:connect({
scheme = "http",
host = "127.0.0.1",
port = 6445,
})
if not ok then
core.log.error("connect 127.0.0.1:6445 failed, message : ", message)
ngx.say("FAILED")
end
local res, err = httpc:request({
method = method,
path = path,
headers = headers,
body = body,
})
if err ~= nil then
core.log.err("operator k8s cluster error: ", err)
return 500
end
ngx.sleep(1)
local k8s = require("apisix.discovery.kubernetes")
local data = k8s.dump_data()
ngx.say(core.json.encode(data,true))
if res.status ~= 200 and res.status ~= 201 and res.status ~= 409 then
return res.status
end
end
ngx.say("DONE")
}
}
location /dump {
content_by_lua_block {
local json_decode = require("toolkit.json").decode
local core = require("apisix.core")
local http = require "resty.http"
local httpc = http.new()
ngx.sleep(1)
local dump_uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/v1/discovery/kubernetes/dump"
local res, err = httpc:request_uri(dump_uri, { method = "GET"})
if err then
ngx.log(ngx.ERR, err)
ngx.status = res.status
return
end
local body = json_decode(res.body)
local endpoints = body.endpoints
ngx.say(core.json.encode(endpoints,true))
}
}
_EOC_
$block->set_value("config", $config);
});