function _M.wait()

in apisix/core/pubsub.lua [160:235]


function _M.wait(self)
    local fatal_err
    local ws = self.ws_server
    while true do
        
        local raw_data, raw_type, err = ws:recv_frame()
        if err then
            
            if ws.fatal then
                fatal_err = err
                break
            end

            
            log.error("failed to receive websocket frame: ", err)
            goto continue
        end

        
        if raw_type == "close" then
            break
        end

        
        
        if raw_type ~= "binary" then
            log.warn("pubsub server receive non-binary data, type: ",
                raw_type, ", data: ", raw_data)
            goto continue
        end

        
        if pb.state() ~= pb_state then
            pb.state(pb_state)
        end
        local data, err = pb.decode("PubSubReq", raw_data)
        if not data then
            log.error("pubsub server receives undecodable data, err: ", err)
            send_error(ws, 0, "wrong command")
            goto continue
        end

        
        local sequence = data.sequence

        local cmd, params = get_cmd(data)
        if not cmd and not params then
            log.warn("pubsub server receives empty command")
            goto continue
        end

        
        local handler = self.cmd_handler[cmd]
        if not handler then
            log.error("pubsub callback handler not registered for the",
                " command, command: ", cmd)
            send_error(ws, sequence, "unknown command")
            goto continue
        end

        
        local resp, err = handler(params)
        if not resp then
            send_error(ws, sequence, err)
            goto continue
        end
        send_resp(ws, sequence, resp)

        ::continue::
    end

    if fatal_err then
        log.error("fatal error in pubsub websocket server, err: ", fatal_err)
    end
    ws:send_close()
end