function _M.from_downstream()

in t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua [83:169]


function _M.from_downstream(session, downstream)
    
    
    core.log.info("call pingpong's from_downstream")

    local p = read_data(downstream, HDR_LEN, false)
    if p == nil then
        return DECLINED
    end

    local p_b = str_byte("p")
    if p[0] ~= p_b or p[1] ~= p_b then
        core.log.error("invalid magic number: ", ffi_str(p, 2))
        return DECLINED
    end

    local typ = p[2]
    if typ == TYPE_HEARTBEAT then
        core.log.info("send heartbeat")

        
        downstream:reset_read_buf()
        downstream:send(ffi_str(p, HDR_LEN))
        return DONE
    end

    local stream_id = p[3] * 256 + p[4]
    local ctx = sdk.get_req_ctx(session, stream_id)

    local body_len = to_int32(p, 6)
    core.log.info("read body len: ", body_len)

    if typ == TYPE_UNARY_DYN_UP then
        local p = read_data(downstream, 4, false)
        if p == nil then
            return DECLINED
        end

        local len = 4
        for i = 0, 3 do
            if p[i] == 0 then
                len = i
                break
            end
        end
        local service = ffi_str(p, len)
        core.log.info("get service [", service, "]")
        ctx.service = service

        local changed, raw_router, version = sdk.get_router(session, router_version)
        if changed then
            router_version = version
            router = {}

            for _, r in ipairs(raw_router) do
                local conf = r.protocol.conf
                if conf and conf.service then
                    router[conf.service] = r
                end
            end
        end

        local conf = router[ctx.service]
        if conf then
            local err = sdk.set_upstream(session, conf)
            if err then
                core.log.error("failed to set upstream: ", err)
                return DECLINED
            end
        end
    end

    local p = read_data(downstream, body_len, true)
    if p == nil then
        return DECLINED
    end

    ctx.is_unary = typ == TYPE_UNARY or typ == TYPE_UNARY_DYN_UP
    ctx.is_stream = typ == TYPE_STREAM
    ctx.id = stream_id
    ctx.len = HDR_LEN + body_len
    if typ == TYPE_UNARY_DYN_UP then
        ctx.len = ctx.len + 4
    end

    return OK, ctx
end