add_block_preprocessor()

in t/tars/discovery/stream/tars.t [30:151]


add_block_preprocessor(sub {
    my ($block) = @_;

    my $yaml_config = <<_EOC_;
apisix:
  node_listen: 1984
  enable_admin: false
deployment:
    role: data_plane
    role_data_plane:
        config_provider: yaml
discovery:
  tars:
    db_conf:
      host: 127.0.0.1
      port: 3306
      database: db_tars
      user: root
      password: tars2022
    full_fetch_interval: 3
    incremental_fetch_interval: 1
_EOC_

    $block->set_value("yaml_config", $yaml_config);

    my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
routes: []
#END
_EOC_

    $block->set_value("apisix_yaml", $apisix_yaml);

    my $extra_init_by_lua_start = <<_EOC_;
        -- reduce incremental_fetch_interval,full_fetch_interval
        local schema = require("apisix.discovery.tars.schema")
        schema.properties.incremental_fetch_interval.minimum=1
        schema.properties.incremental_fetch_interval.default=1
        schema.properties.full_fetch_interval.minimum = 3
        schema.properties.full_fetch_interval.default = 3
_EOC_

    $block->set_value("extra_init_by_lua_start", $extra_init_by_lua_start);
    $block->set_value("stream_extra_init_by_lua_start", $extra_init_by_lua_start);

    my $config = $block->config // <<_EOC_;

        location /sql {
            content_by_lua_block {
                local mysql = require("resty.mysql")
                local core = require("apisix.core")
                local ipairs = ipairs

                ngx.req.read_body()
                local sql = ngx.req.get_body_data()
                core.log.info("get sql ", sql)

                local db_conf= {
                  host="127.0.0.1",
                  port=3306,
                  database="db_tars",
                  user="root",
                  password="tars2022",
                }

                local db_cli, err = mysql:new()
                if not db_cli then
                  core.log.error("failed to instantiate mysql: ", err)
                  return
                end
                db_cli:set_timeout(3000)

                local ok, err, errcode, sqlstate = db_cli:connect(db_conf)
                if not ok then
                  core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
                  return
                end

                local res, err, errcode, sqlstate = db_cli:query(sql)
                if not res then
                   ngx.say("bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
                   return
                end
                ngx.say("DONE")
            }
        }
_EOC_

    $block->set_value("config", $config);

    my $stream_config = $block->stream_config // <<_EOC_;
        server {
            listen 8125;
            content_by_lua_block {
                local core = require("apisix.core")
                local d = require("apisix.discovery.tars")

                ngx.sleep(2)

                local sock = ngx.req.socket()
                local request_body = sock:receive()

                core.log.info("get body ", request_body)

                local response_body = "{"
                local queries = core.json.decode(request_body)
                for _,query in ipairs(queries) do
                  local nodes = d.nodes(query)
                  if nodes==nil or #nodes==0 then
                      response_body=response_body.." "..0
                  else
                      response_body=response_body.." "..#nodes
                  end
                end
                ngx.say(response_body.." }")
            }
        }

_EOC_

    $block->set_value("extra_stream_config", $stream_config);

});