in apisix/stream/xrpc/runner.lua [210:276]
function _M.run(protocol, conn_ctx)
local session = open_session(conn_ctx)
local downstream = protocol.init_downstream(session)
while true do
local status, ctx = protocol.from_downstream(session, downstream)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
if status == DECLINED then
break
end
if status == DONE then
goto continue
end
end
local status, up_ctx = open_upstream(protocol, session, ctx)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
break
end
status = protocol.to_upstream(session, ctx, downstream, up_ctx.upstream)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
if status == DECLINED then
break
end
if status == DONE then
goto continue
end
end
if not up_ctx.coroutine then
local co, err = ngx.thread.spawn(
start_upstream_coroutine, session, protocol, downstream, up_ctx)
if not co then
core.log.error("failed to start upstream coroutine: ", err)
break
end
up_ctx.coroutine = co
end
::continue::
end
close_session(session, protocol)
return 200
end