add_block_preprocessor()

in t/kubernetes/discovery/kubernetes3.t [88:232]


add_block_preprocessor(sub {
    my ($block) = @_;

    my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
routes: []
#END
_EOC_

    $block->set_value("apisix_yaml", $apisix_yaml);

    my $main_config = $block->main_config // <<_EOC_;
env KUBERNETES_SERVICE_HOST=127.0.0.1;
env KUBERNETES_SERVICE_PORT=6443;
env KUBERNETES_CLIENT_TOKEN=$::token_value;
env KUBERNETES_CLIENT_TOKEN_FILE=$::token_file;
_EOC_

    $block->set_value("main_config", $main_config);

    my $config = $block->config // <<_EOC_;
        location /queries {
            content_by_lua_block {
              local core = require("apisix.core")
              local d = require("apisix.discovery.kubernetes")

              ngx.sleep(1)

              ngx.req.read_body()
              local request_body = ngx.req.get_body_data()
              local queries = core.json.decode(request_body)
              local response_body = "{"
              for _,query in ipairs(queries) do
                local nodes = d.nodes(query)
                if nodes==nil or #nodes==0 then
                    response_body=response_body.." "..0
                else
                    response_body=response_body.." "..#nodes
                end
              end
              ngx.say(response_body.." }")
            }
        }

        location /operators {
            content_by_lua_block {
                local http = require("resty.http")
                local core = require("apisix.core")
                local ipairs = ipairs

                ngx.req.read_body()
                local request_body = ngx.req.get_body_data()
                local operators = core.json.decode(request_body)

                core.log.info("get body ", request_body)
                core.log.info("get operators ", #operators)
                for _, op in ipairs(operators) do
                    local method, path, body
                    local headers = {
                        ["Host"] = "127.0.0.1:6445"
                    }

                    if op.op == "replace_endpointslices" then
                        method = "PATCH"
                        path = "/apis/discovery.k8s.io/namespaces/" .. op.namespace .. "/endpointslices/" .. op.name
                        if #op.endpoints == 0 then
                            body = '[{"path":"/endpoints","op":"replace","value":[]}]'
                        else
                            local t = { { op = "replace", path = "/endpoints", value = op.endpoints } }
                            body = core.json.encode(t, true)
                        end
                        headers["Content-Type"] = "application/json-patch+json"
                    end

                    if op.op == "replace_labels" then
                        method = "PATCH"
                        path = "/apis/discovery.k8s.io/namespaces/" .. op.namespace .. "/endpointslices/" .. op.name
                        local t = { { op = "replace", path = "/metadata/labels", value = op.labels } }
                        body = core.json.encode(t, true)
                        headers["Content-Type"] = "application/json-patch+json"
                    end

                    local httpc = http.new()
                    core.log.info("begin to connect ", "127.0.0.1:6445")
                    local ok, message = httpc:connect({
                        scheme = "http",
                        host = "127.0.0.1",
                        port = 6445,
                    })
                    if not ok then
                        core.log.error("connect 127.0.0.1:6445 failed, message : ", message)
                        ngx.say("FAILED")
                    end
                    local res, err = httpc:request({
                        method = method,
                        path = path,
                        headers = headers,
                        body = body,
                    })
                    if err ~= nil then
                        core.log.err("operator k8s cluster error: ", err)
                        return 500
                    end

                    ngx.sleep(1)

                    local k8s = require("apisix.discovery.kubernetes")
                    local data = k8s.dump_data()
                    ngx.say(core.json.encode(data,true))

                    if res.status ~= 200 and res.status ~= 201 and res.status ~= 409 then
                        return res.status
                    end
                end
                ngx.say("DONE")
            }
        }

        location /dump {
            content_by_lua_block {
                local json_decode = require("toolkit.json").decode
                local core = require("apisix.core")
                local http = require "resty.http"
                local httpc = http.new()

                ngx.sleep(1)

                local dump_uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/v1/discovery/kubernetes/dump"
                local res, err = httpc:request_uri(dump_uri, { method = "GET"})
                if err then
                    ngx.log(ngx.ERR, err)
                    ngx.status = res.status
                    return
                end

                local body = json_decode(res.body)
                local endpoints = body.endpoints
                ngx.say(core.json.encode(endpoints,true))
            }
        }

_EOC_

    $block->set_value("config", $config);

});