function _M.run()

in apisix/stream/xrpc/runner.lua [210:276]


function _M.run(protocol, conn_ctx)
    local session = open_session(conn_ctx)
    local downstream = protocol.init_downstream(session)

    while true do
        local status, ctx = protocol.from_downstream(session, downstream)
        if status ~= OK then
            if ctx ~= nil then
                finialize_req(protocol, session, ctx)
            end

            if status == DECLINED then
                
                break
            end

            if status == DONE then
                
                goto continue
            end
        end

        
        local status, up_ctx = open_upstream(protocol, session, ctx)
        if status ~= OK then
            if ctx ~= nil then
                finialize_req(protocol, session, ctx)
            end

            break
        end

        status = protocol.to_upstream(session, ctx, downstream, up_ctx.upstream)
        if status ~= OK then
            if ctx ~= nil then
                finialize_req(protocol, session, ctx)
            end

            if status == DECLINED then
                break
            end

            if status == DONE then
                
                goto continue
            end
        end

        if not up_ctx.coroutine then
            local co, err = ngx.thread.spawn(
                start_upstream_coroutine, session, protocol, downstream, up_ctx)
            if not co then
                core.log.error("failed to start upstream coroutine: ", err)
                break
            end

            up_ctx.coroutine = co
        end

        ::continue::
    end

    close_session(session, protocol)

    
    return 200
end