function _M.stream_preread_phase()

in apisix/init.lua [1033:1149]


function _M.stream_preread_phase()
    local ngx_ctx = ngx.ctx
    local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
    ngx_ctx.api_ctx = api_ctx

    if not verify_tls_client(api_ctx) then
        return ngx_exit(1)
    end

    core.ctx.set_vars_meta(api_ctx)

    local ok, err = router.router_stream.match(api_ctx)
    if not ok then
        core.log.error(err)
        return ngx_exit(1)
    end

    core.log.info("matched route: ",
                  core.json.delay_encode(api_ctx.matched_route, true))

    local matched_route = api_ctx.matched_route
    if not matched_route then
        return ngx_exit(1)
    end


    local up_id = matched_route.value.upstream_id
    if up_id then
        local upstream = apisix_upstream.get_by_id(up_id)
        if not upstream then
            if is_http then
                return core.response.exit(502)
            end

            return ngx_exit(1)
        end

        api_ctx.matched_upstream = upstream

    elseif matched_route.value.service_id then
        local service = service_fetch(matched_route.value.service_id)
        if not service then
            core.log.error("failed to fetch service configuration by ",
                    "id: ", matched_route.value.service_id)
            return core.response.exit(404)
        end

        matched_route = plugin.merge_service_stream_route(service, matched_route)
        api_ctx.matched_route = matched_route
        api_ctx.conf_type = "stream_route&service"
        api_ctx.conf_version = matched_route.modifiedIndex .. "&" .. service.modifiedIndex
        api_ctx.conf_id = matched_route.value.id .. "&" .. service.value.id
        api_ctx.service_id = service.value.id
        api_ctx.service_name = service.value.name
        api_ctx.matched_upstream = matched_route.value.upstream
        if matched_route.value.upstream_id and not matched_route.value.upstream then
            local upstream = apisix_upstream.get_by_id(matched_route.value.upstream_id)
            if not upstream then
                if is_http then
                    return core.response.exit(502)
                end

                return ngx_exit(1)
            end

            api_ctx.matched_upstream = upstream
        end
    else
        if matched_route.has_domain then
            local err
            matched_route, err = parse_domain_in_route(matched_route)
            if err then
                core.log.error("failed to get resolved route: ", err)
                return ngx_exit(1)
            end

            api_ctx.matched_route = matched_route
        end

        local route_val = matched_route.value
        api_ctx.matched_upstream = (matched_route.dns_value and
                                    matched_route.dns_value.upstream)
                                   or route_val.upstream
    end

    local plugins = core.tablepool.fetch("plugins", 32, 0)
    api_ctx.plugins = plugin.stream_filter(matched_route, plugins)
    

    api_ctx.conf_type = "stream/route"
    api_ctx.conf_version = matched_route.modifiedIndex
    api_ctx.conf_id = matched_route.value.id

    plugin.run_plugin("preread", plugins, api_ctx)

    if matched_route.value.protocol then
        xrpc.run_protocol(matched_route.value.protocol, api_ctx)
        return
    end

    local code, err = set_upstream(matched_route, api_ctx)
    if code then
        core.log.error("failed to set upstream: ", err)
        return ngx_exit(1)
    end

    local server, err = load_balancer.pick_server(matched_route, api_ctx)
    if not server then
        core.log.error("failed to pick server: ", err)
        return ngx_exit(1)
    end

    api_ctx.picked_server = server

    
    common_phase("before_proxy")
end