function _M.handle_upstream()

in apisix/init.lua [455:566]


function _M.handle_upstream(api_ctx, route, enable_websocket)
    
    if api_ctx.bypass_nginx_upstream then
        common_phase("before_proxy")
        return
    end

    local up_id = route.value.upstream_id

    
    if api_ctx.upstream_id then
        up_id = api_ctx.upstream_id
    end

    if up_id then
        local upstream = apisix_upstream.get_by_id(up_id)
        if not upstream then
            if is_http then
                return core.response.exit(502)
            end

            return ngx_exit(1)
        end

        api_ctx.matched_upstream = upstream

    else
        if route.has_domain then
            local err
            route, err = parse_domain_in_route(route)
            if err then
                core.log.error("failed to get resolved route: ", err)
                return core.response.exit(500)
            end

            api_ctx.conf_version = route.modifiedIndex
            api_ctx.matched_route = route
        end

        local route_val = route.value

        api_ctx.matched_upstream = (route.dns_value and
                                    route.dns_value.upstream)
                                   or route_val.upstream
    end

    if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and
        api_ctx.matched_upstream.tls.client_cert_id then

        local cert_id = api_ctx.matched_upstream.tls.client_cert_id
        local upstream_ssl = router.router_ssl.get_by_id(cert_id)
        if not upstream_ssl or upstream_ssl.type ~= "client" then
            local err  = upstream_ssl and
                "ssl type should be 'client'" or
                "ssl id [" .. cert_id .. "] not exits"
            core.log.error("failed to get ssl cert: ", err)

            if is_http then
                return core.response.exit(502)
            end

            return ngx_exit(1)
        end

        core.log.info("matched ssl: ",
                  core.json.delay_encode(upstream_ssl, true))
        api_ctx.upstream_ssl = upstream_ssl
    end

    if enable_websocket then
        api_ctx.var.upstream_upgrade    = api_ctx.var.http_upgrade
        api_ctx.var.upstream_connection = api_ctx.var.http_connection
        core.log.info("enabled websocket for route: ", route.value.id)
    end

    
    
    
    if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
        return pubsub_kafka.access(api_ctx)
    end

    local code, err = set_upstream(route, api_ctx)
    if code then
        core.log.error("failed to set upstream: ", err)
        core.response.exit(code)
    end

    local server, err = load_balancer.pick_server(route, api_ctx)
    if not server then
        core.log.error("failed to pick server: ", err)
        return core.response.exit(502)
    end

    api_ctx.picked_server = server

    set_upstream_headers(api_ctx, server)

    
    common_phase("before_proxy")

    local up_scheme = api_ctx.upstream_scheme
    if up_scheme == "grpcs" or up_scheme == "grpc" then
        stash_ngx_ctx()
        return ngx.exec("@grpc_pass")
    end

    if api_ctx.dubbo_proxy_enabled then
        stash_ngx_ctx()
        return ngx.exec("@dubbo_pass")
    end
end