in apisix/init.lua [455:566]
function _M.handle_upstream(api_ctx, route, enable_websocket)
if api_ctx.bypass_nginx_upstream then
common_phase("before_proxy")
return
end
local up_id = route.value.upstream_id
if api_ctx.upstream_id then
up_id = api_ctx.upstream_id
end
if up_id then
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end
return ngx_exit(1)
end
api_ctx.matched_upstream = upstream
else
if route.has_domain then
local err
route, err = parse_domain_in_route(route)
if err then
core.log.error("failed to get resolved route: ", err)
return core.response.exit(500)
end
api_ctx.conf_version = route.modifiedIndex
api_ctx.matched_route = route
end
local route_val = route.value
api_ctx.matched_upstream = (route.dns_value and
route.dns_value.upstream)
or route_val.upstream
end
if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and
api_ctx.matched_upstream.tls.client_cert_id then
local cert_id = api_ctx.matched_upstream.tls.client_cert_id
local upstream_ssl = router.router_ssl.get_by_id(cert_id)
if not upstream_ssl or upstream_ssl.type ~= "client" then
local err = upstream_ssl and
"ssl type should be 'client'" or
"ssl id [" .. cert_id .. "] not exits"
core.log.error("failed to get ssl cert: ", err)
if is_http then
return core.response.exit(502)
end
return ngx_exit(1)
end
core.log.info("matched ssl: ",
core.json.delay_encode(upstream_ssl, true))
api_ctx.upstream_ssl = upstream_ssl
end
if enable_websocket then
api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade
api_ctx.var.upstream_connection = api_ctx.var.http_connection
core.log.info("enabled websocket for route: ", route.value.id)
end
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end
local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
core.response.exit(code)
end
local server, err = load_balancer.pick_server(route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
end
api_ctx.picked_server = server
set_upstream_headers(api_ctx, server)
common_phase("before_proxy")
local up_scheme = api_ctx.upstream_scheme
if up_scheme == "grpcs" or up_scheme == "grpc" then
stash_ngx_ctx()
return ngx.exec("@grpc_pass")
end
if api_ctx.dubbo_proxy_enabled then
stash_ngx_ctx()
return ngx.exec("@dubbo_pass")
end
end