in apisix/core/pubsub.lua [160:235]
function _M.wait(self)
local fatal_err
local ws = self.ws_server
while true do
local raw_data, raw_type, err = ws:recv_frame()
if err then
if ws.fatal then
fatal_err = err
break
end
log.error("failed to receive websocket frame: ", err)
goto continue
end
if raw_type == "close" then
break
end
if raw_type ~= "binary" then
log.warn("pubsub server receive non-binary data, type: ",
raw_type, ", data: ", raw_data)
goto continue
end
if pb.state() ~= pb_state then
pb.state(pb_state)
end
local data, err = pb.decode("PubSubReq", raw_data)
if not data then
log.error("pubsub server receives undecodable data, err: ", err)
send_error(ws, 0, "wrong command")
goto continue
end
local sequence = data.sequence
local cmd, params = get_cmd(data)
if not cmd and not params then
log.warn("pubsub server receives empty command")
goto continue
end
local handler = self.cmd_handler[cmd]
if not handler then
log.error("pubsub callback handler not registered for the",
" command, command: ", cmd)
send_error(ws, sequence, "unknown command")
goto continue
end
local resp, err = handler(params)
if not resp then
send_error(ws, sequence, err)
goto continue
end
send_resp(ws, sequence, resp)
::continue::
end
if fatal_err then
log.error("fatal error in pubsub websocket server, err: ", fatal_err)
end
ws:send_close()
end