in apisix/init.lua [1033:1149]
function _M.stream_preread_phase()
local ngx_ctx = ngx.ctx
local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
ngx_ctx.api_ctx = api_ctx
if not verify_tls_client(api_ctx) then
return ngx_exit(1)
end
core.ctx.set_vars_meta(api_ctx)
local ok, err = router.router_stream.match(api_ctx)
if not ok then
core.log.error(err)
return ngx_exit(1)
end
core.log.info("matched route: ",
core.json.delay_encode(api_ctx.matched_route, true))
local matched_route = api_ctx.matched_route
if not matched_route then
return ngx_exit(1)
end
local up_id = matched_route.value.upstream_id
if up_id then
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end
return ngx_exit(1)
end
api_ctx.matched_upstream = upstream
elseif matched_route.value.service_id then
local service = service_fetch(matched_route.value.service_id)
if not service then
core.log.error("failed to fetch service configuration by ",
"id: ", matched_route.value.service_id)
return core.response.exit(404)
end
matched_route = plugin.merge_service_stream_route(service, matched_route)
api_ctx.matched_route = matched_route
api_ctx.conf_type = "stream_route&service"
api_ctx.conf_version = matched_route.modifiedIndex .. "&" .. service.modifiedIndex
api_ctx.conf_id = matched_route.value.id .. "&" .. service.value.id
api_ctx.service_id = service.value.id
api_ctx.service_name = service.value.name
api_ctx.matched_upstream = matched_route.value.upstream
if matched_route.value.upstream_id and not matched_route.value.upstream then
local upstream = apisix_upstream.get_by_id(matched_route.value.upstream_id)
if not upstream then
if is_http then
return core.response.exit(502)
end
return ngx_exit(1)
end
api_ctx.matched_upstream = upstream
end
else
if matched_route.has_domain then
local err
matched_route, err = parse_domain_in_route(matched_route)
if err then
core.log.error("failed to get resolved route: ", err)
return ngx_exit(1)
end
api_ctx.matched_route = matched_route
end
local route_val = matched_route.value
api_ctx.matched_upstream = (matched_route.dns_value and
matched_route.dns_value.upstream)
or route_val.upstream
end
local plugins = core.tablepool.fetch("plugins", 32, 0)
api_ctx.plugins = plugin.stream_filter(matched_route, plugins)
api_ctx.conf_type = "stream/route"
api_ctx.conf_version = matched_route.modifiedIndex
api_ctx.conf_id = matched_route.value.id
plugin.run_plugin("preread", plugins, api_ctx)
if matched_route.value.protocol then
xrpc.run_protocol(matched_route.value.protocol, api_ctx)
return
end
local code, err = set_upstream(matched_route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
return ngx_exit(1)
end
local server, err = load_balancer.pick_server(matched_route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return ngx_exit(1)
end
api_ctx.picked_server = server
common_phase("before_proxy")
end