t/tars/discovery/tars.t (118 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';
repeat_each(1);
log_level('warn');
no_root_location();
no_shuffle();
workers(4);
add_block_preprocessor(sub {
my ($block) = @_;
my $yaml_config = <<_EOC_;
apisix:
node_listen: 1984
enable_admin: false
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
tars:
db_conf:
host: 127.0.0.1
port: 3306
database: db_tars
user: root
password: tars2022
full_fetch_interval: 3
incremental_fetch_interval: 1
_EOC_
$block->set_value("yaml_config", $yaml_config);
my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
routes: []
#END
_EOC_
$block->set_value("apisix_yaml", $apisix_yaml);
my $extra_init_by_lua_start = <<_EOC_;
-- reduce incremental_fetch_interval,full_fetch_interval
local schema = require("apisix.discovery.tars.schema")
schema.properties.incremental_fetch_interval.minimum=1
schema.properties.incremental_fetch_interval.default=1
schema.properties.full_fetch_interval.minimum = 3
schema.properties.full_fetch_interval.default = 3
_EOC_
$block->set_value("extra_init_by_lua_start", $extra_init_by_lua_start);
my $config = $block->config // <<_EOC_;
location /count {
content_by_lua_block {
local core = require("apisix.core")
local d = require("apisix.discovery.tars")
ngx.sleep(2)
ngx.req.read_body()
local request_body = ngx.req.get_body_data()
local queries = core.json.decode(request_body)
local response_body = "{"
for _,query in ipairs(queries) do
local nodes = d.nodes(query)
if nodes==nil or #nodes==0 then
response_body=response_body.." "..0
else
response_body=response_body.." "..#nodes
end
end
ngx.say(response_body.." }")
}
}
location /nodes {
content_by_lua_block {
local core = require("apisix.core")
local d = require("apisix.discovery.tars")
ngx.sleep(2)
ngx.req.read_body()
local servant = ngx.req.get_body_data()
local response=""
local nodes = d.nodes(servant)
response="{"
for _,node in ipairs(nodes or {}) do
response=response..node.host..":"..node.port..","
end
response=response.."}"
ngx.say(response)
}
}
location /sql {
content_by_lua_block {
local mysql = require("resty.mysql")
local core = require("apisix.core")
local ipairs = ipairs
ngx.req.read_body()
local sql = ngx.req.get_body_data()
core.log.info("get sql ", sql)
local db_conf= {
host="127.0.0.1",
port=3306,
database="db_tars",
user="root",
password="tars2022",
}
local db_cli, err = mysql:new()
if not db_cli then
core.log.error("failed to instantiate mysql: ", err)
return
end
db_cli:set_timeout(3000)
local ok, err, errcode, sqlstate = db_cli:connect(db_conf)
if not ok then
core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
return
end
local res, err, errcode, sqlstate = db_cli:query(sql)
if not res then
ngx.say("bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
return
end
ngx.say("DONE")
}
}
_EOC_
$block->set_value("config", $config);
});
run_tests();
__DATA__
=== TEST 1: create initial server and servant
--- timeout: 3
--- request eval
[
"POST /sql
truncate table t_server_conf",
"POST /sql
truncate table t_adapter_conf",
"POST /sql
insert into t_server_conf(application, server_name, node_name, registry_timestamp,
template_name, setting_state, present_state, server_type)
values ('A', 'AServer', '172.16.1.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
('B', 'BServer', '172.16.2.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
('C', 'CServer', '172.16.3.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp')",
"POST /sql
insert into t_adapter_conf(application, server_name, node_name, adapter_name, endpoint, servant)
values ('A', 'AServer', '172.16.1.1', 'A.AServer.FirstObjAdapter',
'tcp -h 172.16.1.1 -p 10001 -e 0 -t 6000', 'A.AServer.FirstObj'),
('B', 'BServer', '172.16.2.1', 'B.BServer.FirstObjAdapter',
'tcp -p 10001 -h 172.16.2.1 -e 0 -t 6000', 'B.BServer.FirstObj'),
('C', 'CServer', '172.16.3.1', 'C.CServer.FirstObjAdapter',
'tcp -e 0 -h 172.16.3.1 -t 6000 -p 10001 ', 'C.CServer.FirstObj')",
"GET /count
[\"A.AServer.FirstObj\",\"B.BServer.FirstObj\", \"C.CServer.FirstObj\"]",
]
--- response_body eval
[
"DONE\n",
"DONE\n",
"DONE\n",
"DONE\n",
"{ 1 1 1 }\n",
]
=== TEST 2: add servers on different nodes
--- timeout: 3
--- request eval
[
"POST /sql
insert into t_server_conf(application, server_name, node_name, registry_timestamp,
template_name, setting_state, present_state, server_type)
values ('A', 'AServer', '172.16.1.2', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
('B', 'BServer', '172.16.2.2', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
('C', 'CServer', '172.16.3.2', now(), 'taf-cpp', 'active', 'active', 'tars_cpp')",
"GET /count
[\"A.AServer.FirstObj\",\"B.BServer.FirstObj\", \"C.CServer.FirstObj\"]",
]
--- response_body eval
[
"DONE\n",
"{ 1 1 1 }\n",
]
=== TEST 3: add servant
--- timeout: 3
--- request eval
[
"POST /sql
insert into t_adapter_conf(application, server_name, node_name, adapter_name, endpoint, servant)
values ('A', 'AServer', '172.16.1.2', 'A.AServer.FirstObjAdapter',
'tcp -h 172.16.1.2 -p 10001 -e 0 -t 6000', 'A.AServer.FirstObj'),
('A', 'AServer', '172.16.1.2', 'A.AServer.SecondObjAdapter',
'tcp -p 10002 -h 172.16.1.2 -e 0 -t 6000', 'A.AServer.SecondObj')",
"GET /count
[\"A.AServer.FirstObj\", \"A.AServer.SecondObj\", \"B.BServer.FirstObj\", \"C.CServer.FirstObj\"]",
]
--- response_body eval
[
"DONE\n",
"{ 2 1 1 1 }\n",
]
=== TEST 4: update servant, update setting_state
--- timeout: 3
--- request eval
[
"POST /sql
update t_server_conf set setting_state='inactive'
where application = 'A' and server_name = 'AServer' and node_name = '172.16.1.2'",
"GET /count
[\"A.AServer.FirstObj\", \"A.AServer.SecondObj\", \"B.BServer.FirstObj\", \"C.CServer.FirstObj\"]",
]
--- response_body eval
[
"DONE\n",
"{ 1 0 1 1 }\n",
]
=== TEST 5: update server setting_state
--- timeout: 3
--- request eval
[
"POST /sql
update t_server_conf set setting_state='active', present_state='inactive'
where application = 'A' and server_name = 'AServer' and node_name = '172.16.1.2'",
"GET /count
[\"A.AServer.FirstObj\", \"A.AServer.SecondObj\", \"B.BServer.FirstObj\", \"C.CServer.FirstObj\"]",
]
--- response_body eval
[
"DONE\n",
"{ 1 0 1 1 }\n",
]
=== TEST 6: update server present_state
--- timeout: 3
--- request eval
[
"POST /sql
update t_server_conf set setting_state='active', present_state='active'
where application = 'A' and server_name = 'AServer' and node_name = '172.16.1.2'",
"GET /count
[\"A.AServer.FirstObj\", \"A.AServer.SecondObj\", \"B.BServer.FirstObj\", \"C.CServer.FirstObj\"]",
]
--- response_body eval
[
"DONE\n",
"{ 2 1 1 1 }\n",
]
=== TEST 7: update servant endpoint
--- timeout: 3
--- request eval
[
"GET /nodes
A.AServer.SecondObj",
"POST /sql
update t_adapter_conf set endpoint='tcp -h 172.16.1.2 -p 10003 -e 0 -t 3000'
where application = 'A' and server_name = 'AServer'
and node_name = '172.16.1.2' and servant='A.AServer.SecondObj'",
"GET /nodes
A.AServer.SecondObj",
]
--- response_body eval
[
"{172.16.1.2:10002,}\n",
"DONE\n",
"{172.16.1.2:10003,}\n",
]
=== TEST 8: delete servant
--- request eval
[
"POST /sql
delete from t_adapter_conf where application = 'A' and server_name = 'AServer'
and node_name = '172.16.1.2' and servant = 'A.AServer.SecondObj'",
]
--- response_body eval
[
"DONE\n",
]
=== TEST 9: count after delete servant
--- timeout: 4
--- wait: 3
--- request eval
[
"GET /count
[\"A.AServer.FirstObj\", \"A.AServer.SecondObj\", \"B.BServer.FirstObj\", \"C.CServer.FirstObj\"]",
]
--- response_body eval
[
"{ 2 0 1 1 }\n",
]
=== TEST 10: delete server
--- request eval
[
"POST /sql
delete from t_server_conf
where application = 'A' and server_name = 'AServer' and node_name = '172.16.1.1'",
]
--- response_body eval
[
"DONE\n",
]
=== TEST 11: count after delete
--- timeout: 4
--- wait: 3
--- request eval
[
"GET /count
[\"A.AServer.FirstObj\", \"A.AServer.SecondObj\", \"B.BServer.FirstObj\", \"C.CServer.FirstObj\"]",
]
--- response_body eval
[
"{ 1 0 1 1 }\n",
]