in t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua [83:169]
function _M.from_downstream(session, downstream)
core.log.info("call pingpong's from_downstream")
local p = read_data(downstream, HDR_LEN, false)
if p == nil then
return DECLINED
end
local p_b = str_byte("p")
if p[0] ~= p_b or p[1] ~= p_b then
core.log.error("invalid magic number: ", ffi_str(p, 2))
return DECLINED
end
local typ = p[2]
if typ == TYPE_HEARTBEAT then
core.log.info("send heartbeat")
downstream:reset_read_buf()
downstream:send(ffi_str(p, HDR_LEN))
return DONE
end
local stream_id = p[3] * 256 + p[4]
local ctx = sdk.get_req_ctx(session, stream_id)
local body_len = to_int32(p, 6)
core.log.info("read body len: ", body_len)
if typ == TYPE_UNARY_DYN_UP then
local p = read_data(downstream, 4, false)
if p == nil then
return DECLINED
end
local len = 4
for i = 0, 3 do
if p[i] == 0 then
len = i
break
end
end
local service = ffi_str(p, len)
core.log.info("get service [", service, "]")
ctx.service = service
local changed, raw_router, version = sdk.get_router(session, router_version)
if changed then
router_version = version
router = {}
for _, r in ipairs(raw_router) do
local conf = r.protocol.conf
if conf and conf.service then
router[conf.service] = r
end
end
end
local conf = router[ctx.service]
if conf then
local err = sdk.set_upstream(session, conf)
if err then
core.log.error("failed to set upstream: ", err)
return DECLINED
end
end
end
local p = read_data(downstream, body_len, true)
if p == nil then
return DECLINED
end
ctx.is_unary = typ == TYPE_UNARY or typ == TYPE_UNARY_DYN_UP
ctx.is_stream = typ == TYPE_STREAM
ctx.id = stream_id
ctx.len = HDR_LEN + body_len
if typ == TYPE_UNARY_DYN_UP then
ctx.len = ctx.len + 4
end
return OK, ctx
end