t/xrpc/pingpong2.t (76 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # use t::APISIX; my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx'; my $version = eval { `$nginx_binary -V 2>&1` }; if ($version !~ m/\/apisix-nginx-module/) { plan(skip_all => "apisix-nginx-module not installed"); } else { plan('no_plan'); } add_block_preprocessor(sub { my ($block) = @_; if (!$block->extra_yaml_config) { my $extra_yaml_config = <<_EOC_; xrpc: protocols: - name: pingpong _EOC_ $block->set_value("extra_yaml_config", $extra_yaml_config); } my $config = $block->config // <<_EOC_; location /t { content_by_lua_block { ngx.req.read_body() local sock = ngx.socket.tcp() sock:settimeout(1000) local ok, err = sock:connect("127.0.0.1", 1985) if not ok then ngx.log(ngx.ERR, "failed to connect: ", err) return ngx.exit(503) end local bytes, err = sock:send(ngx.req.get_body_data()) if not bytes then ngx.log(ngx.ERR, "send stream request error: ", err) return ngx.exit(503) end while true do local data, err = sock:receiveany(4096) if not data then sock:close() break end ngx.print(data) end } } _EOC_ $block->set_value("config", $config); my $stream_upstream_code = $block->stream_upstream_code // <<_EOC_; local sock = ngx.req.socket(true) sock:settimeout(10) while true do local data = sock:receiveany(4096) if not data then return end sock:send(data) end _EOC_ $block->set_value("stream_upstream_code", $stream_upstream_code); if ((!defined $block->error_log) && (!defined $block->no_error_log)) { $block->set_value("no_error_log", "[error]\nRPC is not finished"); } if (!defined $block->extra_stream_config) { my $stream_config = <<_EOC_; server { listen 8125 udp; content_by_lua_block { require("lib.mock_layer4").dogstatsd() } } _EOC_ $block->set_value("extra_stream_config", $stream_config); } $block; }); run_tests; __DATA__ === TEST 1: init --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong" }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code end ngx.say(body) } } --- request GET /t --- response_body passed === TEST 2: check the default timeout --- request eval "POST /t " . "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- response_body eval "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- error_log stream lua tcp socket connect timeout: 60000 lua tcp socket send timeout: 60000 stream lua tcp socket read timeout: 60000 --- log_level: debug --- stream_conf_enable === TEST 3: bad loggger filter --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", filter = { {} }, conf = {} } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code end ngx.say(body) } } --- request GET /t --- response_body passed === TEST 4: failed to validate the 'filter' expression --- request eval "POST /t " . "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- stream_conf_enable --- error_log failed to validate the 'filter' expression: rule too short === TEST 5: set loggger filter(single rule) --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", filter = { {"rpc_len", ">", 10} }, conf = {} } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code end ngx.say(body) } } --- request GET /t --- response_body passed === TEST 6: log filter matched successful --- request eval "POST /t " . "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- stream_conf_enable --- error_log log filter: syslog filter result: true === TEST 7: update loggger filter --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", filter = { {"rpc_len", "<", 10} }, conf = {} } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code end ngx.say(body) } } --- request GET /t --- response_body passed === TEST 8: failed to match log filter --- request eval "POST /t " . "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- stream_conf_enable --- error_log log filter: syslog filter result: false === TEST 9: set loggger filter(multiple rules) --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", filter = { {"rpc_len", ">", 12}, {"rpc_len", "<", 14} }, conf = {} } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code end ngx.say(body) } } --- request GET /t --- response_body passed === TEST 10: log filter matched successful --- request eval "POST /t " . "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- stream_conf_enable --- error_log log filter: syslog filter result: true === TEST 11: update loggger filter --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", filter = { {"rpc_len", "<", 10}, {"rpc_len", ">", 12} }, conf = {} } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code end ngx.say(body) } } --- request GET /t --- response_body passed === TEST 12: failed to match log filter --- request eval "POST /t " . "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- stream_conf_enable --- error_log log filter: syslog filter result: false === TEST 13: set custom log format --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/plugin_metadata/syslog', ngx.HTTP_PUT, [[{ "log_format": { "client_ip": "$remote_addr" } }]] ) if code >= 300 then ngx.status = code ngx.say(body) return end ngx.say(body) } } --- request GET /t --- response_body passed === TEST 14: no loggger filter, defaulte executed logger plugin --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", conf = { host = "127.0.0.1", port = 8125, sock_type = "udp", batch_max_size = 1, flush_limit = 1 } } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code end ngx.say(body) } } --- request GET /t --- response_body passed === TEST 15: verify the data received by the log server --- request eval "POST /t " . "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- stream_conf_enable --- wait: 0.5 --- error_log eval qr/message received:.*\"client_ip\"\:\"127.0.0.1\"/ === TEST 16: set loggger filter --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", filter = { {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", port = 8125, sock_type = "udp", batch_max_size = 1, flush_limit = 1 } } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code end ngx.say(body) } } --- request GET /t --- response_body passed === TEST 17: verify the data received by the log server --- request eval "POST /t " . "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- stream_conf_enable --- wait: 0.5 --- error_log eval qr/message received:.*\"client_ip\"\:\"127.0.0.1\"/ === TEST 18: small flush_limit, instant flush --- stream_conf_enable --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", filter = { {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", port = 5044, batch_max_size = 1, flush_limit = 1 } } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code end -- wait etcd sync ngx.sleep(0.5) local sock = ngx.socket.tcp() sock:settimeout(1000) local ok, err = sock:connect("127.0.0.1", 1985) if not ok then ngx.log(ngx.ERR, "failed to connect: ", err) return ngx.exit(503) end assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC")) while true do local data, err = sock:receiveany(4096) if not data then sock:close() break end ngx.print(data) end -- wait flush log ngx.sleep(2.5) } } --- request GET /t --- response_body eval "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- timeout: 5 --- error_log try to lock with key xrpc-pingpong-logger#table unlock with key xrpc-pingpong-logger#table === TEST 19: check plugin configuration updating --- stream_conf_enable --- config location /t { content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", filter = { {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", port = 5044, batch_max_size = 1 } } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code ngx.say("fail") return end local sock = ngx.socket.tcp() local ok, err = sock:connect("127.0.0.1", 1985) if not ok then ngx.status = code ngx.say("fail") return end assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC")) local body1, err while true do body1, err = sock:receiveany(4096) if not data then sock:close() break end end local code, body = t('/apisix/admin/stream_routes/1', ngx.HTTP_PUT, { protocol = { name = "pingpong", logger = { { name = "syslog", filter = { {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", port = 5045, batch_max_size = 1 } } } }, upstream = { nodes = { ["127.0.0.1:1995"] = 1 }, type = "roundrobin" } } ) if code >= 300 then ngx.status = code ngx.say("fail") return end local sock = ngx.socket.tcp() local ok, err = sock:connect("127.0.0.1", 1985) if not ok then ngx.status = code ngx.say("fail") return end assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC")) local body2, err while true do body2, err = sock:receiveany(4096) if not data then sock:close() break end end ngx.print(body1) ngx.print(body2) } } --- request GET /t --- wait: 0.5 --- response_body eval "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" . "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" --- grep_error_log eval qr/sending a batch logs to 127.0.0.1:(\d+)/ --- grep_error_log_out sending a batch logs to 127.0.0.1:5044 sending a batch logs to 127.0.0.1:5045