in t/kubernetes/discovery/stream/kubernetes.t [62:185]
add_block_preprocessor(sub {
my ($block) = @_;
my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
routes: []
#END
_EOC_
$block->set_value("apisix_yaml", $apisix_yaml);
my $main_config = $block->main_config // <<_EOC_;
env KUBERNETES_SERVICE_HOST=127.0.0.1;
env KUBERNETES_SERVICE_PORT=6443;
env KUBERNETES_CLIENT_TOKEN=$::token_value;
env KUBERNETES_CLIENT_TOKEN_FILE=$::token_file;
_EOC_
$block->set_value("main_config", $main_config);
my $config = $block->config // <<_EOC_;
location /operators {
content_by_lua_block {
local http = require("resty.http")
local core = require("apisix.core")
local ipairs = ipairs
ngx.req.read_body()
local request_body = ngx.req.get_body_data()
local operators = core.json.decode(request_body)
core.log.info("get body ", request_body)
core.log.info("get operators ", #operators)
for _, op in ipairs(operators) do
local method, path, body
local headers = {
["Host"] = "127.0.0.1:6445"
}
if op.op == "replace_subsets" then
method = "PATCH"
path = "/api/v1/namespaces/" .. op.namespace .. "/endpoints/" .. op.name
if #op.subsets == 0 then
body = '[{"path":"/subsets","op":"replace","value":[]}]'
else
local t = { { op = "replace", path = "/subsets", value = op.subsets } }
body = core.json.encode(t, true)
end
headers["Content-Type"] = "application/json-patch+json"
end
if op.op == "replace_labels" then
method = "PATCH"
path = "/api/v1/namespaces/" .. op.namespace .. "/endpoints/" .. op.name
local t = { { op = "replace", path = "/metadata/labels", value = op.labels } }
body = core.json.encode(t, true)
headers["Content-Type"] = "application/json-patch+json"
end
local httpc = http.new()
core.log.info("begin to connect ", "127.0.0.1:6445")
local ok, message = httpc:connect({
scheme = "http",
host = "127.0.0.1",
port = 6445,
})
if not ok then
core.log.error("connect 127.0.0.1:6445 failed, message : ", message)
ngx.say("FAILED")
end
local res, err = httpc:request({
method = method,
path = path,
headers = headers,
body = body,
})
if err ~= nil then
core.log.err("operator k8s cluster error: ", err)
return 500
end
if res.status ~= 200 and res.status ~= 201 and res.status ~= 409 then
return res.status
end
end
ngx.say("DONE")
}
}
_EOC_
$block->set_value("config", $config);
my $stream_config = $block->stream_config // <<_EOC_;
server {
listen 8125;
content_by_lua_block {
local core = require("apisix.core")
local d = require("apisix.discovery.kubernetes")
ngx.sleep(1)
local sock = ngx.req.socket()
local request_body = sock:receive()
core.log.info("get body ", request_body)
local response_body = "{"
local queries = core.json.decode(request_body)
for _,query in ipairs(queries) do
local nodes = d.nodes(query)
if nodes==nil or #nodes==0 then
response_body=response_body.." "..0
else
response_body=response_body.." "..#nodes
end
end
ngx.say(response_body.." }")
}
}
_EOC_
$block->set_value("extra_stream_config", $stream_config);
});