t/kubernetes/discovery/stream/kubernetes.t (141 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
BEGIN {
our $token_file = "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token";
our $token_value = eval {`cat $token_file 2>/dev/null`};
our $yaml_config = <<_EOC_;
apisix:
node_listen: 1984
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
kubernetes:
- id: first
service:
host: "127.0.0.1"
port: "6443"
client:
token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
- id: second
service:
schema: "http"
host: "127.0.0.1"
port: "6445"
client:
token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
_EOC_
}
use t::APISIX;
my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
my $version = eval { `$nginx_binary -V 2>&1` };
plan('no_plan');
repeat_each(1);
log_level('warn');
no_root_location();
no_shuffle();
workers(4);
add_block_preprocessor(sub {
my ($block) = @_;
my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
routes: []
#END
_EOC_
$block->set_value("apisix_yaml", $apisix_yaml);
my $main_config = $block->main_config // <<_EOC_;
env KUBERNETES_SERVICE_HOST=127.0.0.1;
env KUBERNETES_SERVICE_PORT=6443;
env KUBERNETES_CLIENT_TOKEN=$::token_value;
env KUBERNETES_CLIENT_TOKEN_FILE=$::token_file;
_EOC_
$block->set_value("main_config", $main_config);
my $config = $block->config // <<_EOC_;
location /operators {
content_by_lua_block {
local http = require("resty.http")
local core = require("apisix.core")
local ipairs = ipairs
ngx.req.read_body()
local request_body = ngx.req.get_body_data()
local operators = core.json.decode(request_body)
core.log.info("get body ", request_body)
core.log.info("get operators ", #operators)
for _, op in ipairs(operators) do
local method, path, body
local headers = {
["Host"] = "127.0.0.1:6445"
}
if op.op == "replace_subsets" then
method = "PATCH"
path = "/api/v1/namespaces/" .. op.namespace .. "/endpoints/" .. op.name
if #op.subsets == 0 then
body = '[{"path":"/subsets","op":"replace","value":[]}]'
else
local t = { { op = "replace", path = "/subsets", value = op.subsets } }
body = core.json.encode(t, true)
end
headers["Content-Type"] = "application/json-patch+json"
end
if op.op == "replace_labels" then
method = "PATCH"
path = "/api/v1/namespaces/" .. op.namespace .. "/endpoints/" .. op.name
local t = { { op = "replace", path = "/metadata/labels", value = op.labels } }
body = core.json.encode(t, true)
headers["Content-Type"] = "application/json-patch+json"
end
local httpc = http.new()
core.log.info("begin to connect ", "127.0.0.1:6445")
local ok, message = httpc:connect({
scheme = "http",
host = "127.0.0.1",
port = 6445,
})
if not ok then
core.log.error("connect 127.0.0.1:6445 failed, message : ", message)
ngx.say("FAILED")
end
local res, err = httpc:request({
method = method,
path = path,
headers = headers,
body = body,
})
if err ~= nil then
core.log.err("operator k8s cluster error: ", err)
return 500
end
if res.status ~= 200 and res.status ~= 201 and res.status ~= 409 then
return res.status
end
end
ngx.say("DONE")
}
}
_EOC_
$block->set_value("config", $config);
my $stream_config = $block->stream_config // <<_EOC_;
server {
listen 8125;
content_by_lua_block {
local core = require("apisix.core")
local d = require("apisix.discovery.kubernetes")
ngx.sleep(1)
local sock = ngx.req.socket()
local request_body = sock:receive()
core.log.info("get body ", request_body)
local response_body = "{"
local queries = core.json.decode(request_body)
for _,query in ipairs(queries) do
local nodes = d.nodes(query)
if nodes==nil or #nodes==0 then
response_body=response_body.." "..0
else
response_body=response_body.." "..#nodes
end
end
ngx.say(response_body.." }")
}
}
_EOC_
$block->set_value("extra_stream_config", $stream_config);
});
run_tests();
__DATA__
=== TEST 1: create namespace and endpoints
--- yaml_config eval: $::yaml_config
--- request
POST /operators
[
{
"op": "replace_subsets",
"namespace": "ns-a",
"name": "ep",
"subsets": [
{
"addresses": [
{
"ip": "10.0.0.1"
},
{
"ip": "10.0.0.2"
}
],
"ports": [
{
"name": "p1",
"port": 5001
}
]
},
{
"addresses": [
{
"ip": "20.0.0.1"
},
{
"ip": "20.0.0.2"
}
],
"ports": [
{
"name": "p2",
"port": 5002
}
]
}
]
},
{
"op": "create_namespace",
"name": "ns-b"
},
{
"op": "replace_subsets",
"namespace": "ns-b",
"name": "ep",
"subsets": [
{
"addresses": [
{
"ip": "10.0.0.1"
},
{
"ip": "10.0.0.2"
}
],
"ports": [
{
"name": "p1",
"port": 5001
}
]
},
{
"addresses": [
{
"ip": "20.0.0.1"
},
{
"ip": "20.0.0.2"
}
],
"ports": [
{
"name": "p2",
"port": 5002
}
]
}
]
},
{
"op": "create_namespace",
"name": "ns-c"
},
{
"op": "replace_subsets",
"namespace": "ns-c",
"name": "ep",
"subsets": [
{
"addresses": [
{
"ip": "10.0.0.1"
},
{
"ip": "10.0.0.2"
}
],
"ports": [
{
"port": 5001
}
]
},
{
"addresses": [
{
"ip": "20.0.0.1"
},
{
"ip": "20.0.0.2"
}
],
"ports": [
{
"port": 5002
}
]
}
]
}
]
--- more_headers
Content-type: application/json
=== TEST 2: use default parameters
--- yaml_config eval: $::yaml_config
--- apisix_yaml
stream_routes:
-
id: 1
server_port: 1985
upstream_id: 1
upstreams:
- nodes:
"127.0.0.1:8125": 1
type: roundrobin
id: 1
#END
--- stream_request
["first/ns-a/ep:p1","first/ns-a/ep:p2","first/ns-b/ep:p1","second/ns-a/ep:p1","second/ns-a/ep:p2","second/ns-b/ep:p1"]
--- stream_response eval
qr{ 2 2 2 2 2 2 }