ClientImpl.prototype._on_socket_message = function()

in activemq-web-demo/src/main/webapp/mqtt/mqttws31.js [1077:1235]


    ClientImpl.prototype._on_socket_message = function (event) {
        this._trace("Client._on_socket_message", event.data);
        
        // Reset the receive ping timer, we now have evidence the server is alive.
        this.receivePinger.reset();
        var byteArray = new Uint8Array(event.data);
        try {
            var wireMessage = decodeMessage(byteArray);
        } catch (error) {
        	this._disconnected(ERROR.INTERNAL_ERROR.code , format(ERROR.INTERNAL_ERROR, [error.message]));
        	return;
        }
        this._trace("Client._on_socket_message", wireMessage);

        switch(wireMessage.type) {
            case MESSAGE_TYPE.CONNACK:
            	this._connectTimeout.cancel();
            	
            	// If we have started using clean session then clear up the local state.
            	if (this.connectOptions.cleanSession) {
    		    	for (key in this._sentMessages) {	    		
    		    	    var sentMessage = this._sentMessages[key];
    					localStorage.removeItem("Sent:"+this._localKey+sentMessage.messageIdentifier);
    		    	}
    				this._sentMessages = {};

    				for (key in this._receivedMessages) {
    					var receivedMessage = this._receivedMessages[key];
    					localStorage.removeItem("Received:"+this._localKey+receivedMessage.messageIdentifier);
    				}
    				this._receivedMessages = {};
            	}
            	// Client connected and ready for business.
            	if (wireMessage.returnCode === 0) {
        	        this.connected = true;
        	        // Jump to the end of the list of hosts and stop looking for a good host.
        	        if (this.connectOptions.hosts)
        	            this.hostIndex = this.connectOptions.hosts.length;
                } else {
                    this._disconnected(ERROR.CONNACK_RETURNCODE.code , format(ERROR.CONNACK_RETURNCODE, [wireMessage.returnCode, CONNACK_RC[wireMessage.returnCode]]));
                    break;
                }
            	
        	    // Resend messages.
            	var sequencedMessages = new Array();
            	for (var msgId in this._sentMessages) {
            	    if (this._sentMessages.hasOwnProperty(msgId))
            	        sequencedMessages.push(this._sentMessages[msgId]);
            	}
          
        	    // Sort sentMessages into the original sent order.
            	var sequencedMessages = sequencedMessages.sort(function(a,b) {return a.sequence - b.sequence;} );
        	    for (var i=0, len=sequencedMessages.length; i<len; i++) {
        	    	var sentMessage = sequencedMessages[i];
        	    	if (sentMessage.type == MESSAGE_TYPE.PUBLISH && sentMessage.pubRecReceived) {
        	    	    var pubRelMessage = new WireMessage(MESSAGE_TYPE.PUBREL, {messageIdentifier:sentMessage.messageIdentifier});
        	            this._schedule_message(pubRelMessage);
        	    	} else {
        	    		this._schedule_message(sentMessage);
        	    	};
        	    }

        	    // Execute the connectOptions.onSuccess callback if there is one.
        	    if (this.connectOptions.onSuccess) {
        	        this.connectOptions.onSuccess({invocationContext:this.connectOptions.invocationContext});
        	    }

        	    // Process all queued messages now that the connection is established. 
        	    this._process_queue();
        	    break;
        
            case MESSAGE_TYPE.PUBLISH:
                this._receivePublish(wireMessage);
                break;

            case MESSAGE_TYPE.PUBACK:
            	var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
                 // If this is a re flow of a PUBACK after we have restarted receivedMessage will not exist.
            	if (sentMessage) {
                    delete this._sentMessages[wireMessage.messageIdentifier];
                    localStorage.removeItem("Sent:"+this._localKey+wireMessage.messageIdentifier);
                    if (this.onMessageDelivered)
                    	this.onMessageDelivered(sentMessage.payloadMessage);
                }
            	break;
            
            case MESSAGE_TYPE.PUBREC:
                var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
                // If this is a re flow of a PUBREC after we have restarted receivedMessage will not exist.
                if (sentMessage) {
                	sentMessage.pubRecReceived = true;
                    var pubRelMessage = new WireMessage(MESSAGE_TYPE.PUBREL, {messageIdentifier:wireMessage.messageIdentifier});
                    this.store("Sent:", sentMessage);
                    this._schedule_message(pubRelMessage);
                }
                break;
            	            	
            case MESSAGE_TYPE.PUBREL:
                var receivedMessage = this._receivedMessages[wireMessage.messageIdentifier];
                localStorage.removeItem("Received:"+this._localKey+wireMessage.messageIdentifier);
                // If this is a re flow of a PUBREL after we have restarted receivedMessage will not exist.
                if (receivedMessage) {
                    this._receiveMessage(receivedMessage);
                    delete this._receivedMessages[wireMessage.messageIdentifier];
                }
                // Always flow PubComp, we may have previously flowed PubComp but the server lost it and restarted.
                pubCompMessage = new WireMessage(MESSAGE_TYPE.PUBCOMP, {messageIdentifier:wireMessage.messageIdentifier});
                this._schedule_message(pubCompMessage);                    
                    
                
                break;

            case MESSAGE_TYPE.PUBCOMP: 
            	var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
            	delete this._sentMessages[wireMessage.messageIdentifier];
                localStorage.removeItem("Sent:"+this._localKey+wireMessage.messageIdentifier);
                if (this.onMessageDelivered)
                	this.onMessageDelivered(sentMessage.payloadMessage);
                break;
                
            case MESSAGE_TYPE.SUBACK:
                var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
                if (sentMessage) {
                	if(sentMessage.timeOut)
                	    sentMessage.timeOut.cancel();
                    if (sentMessage.callback) {
                        sentMessage.callback();
                    }
                    delete this._sentMessages[wireMessage.messageIdentifier];
                }
                break;
        	    
            case MESSAGE_TYPE.UNSUBACK:
            	var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
                if (sentMessage) { 
                	if (sentMessage.timeOut)
                        sentMessage.timeOut.cancel();
                    if (sentMessage.callback) {
                        sentMessage.callback();
                    }
                    delete this._sentMessages[wireMessage.messageIdentifier];
                }

                break;
                
            case MESSAGE_TYPE.PINGRESP:
            	/* The sendPinger or receivePinger may have sent a ping, the receivePinger has already been reset. */
            	this.sendPinger.reset();
            	break;
            	
            case MESSAGE_TYPE.DISCONNECT:
            	// Clients do not expect to receive disconnect packets.
            	this._disconnected(ERROR.INVALID_MQTT_MESSAGE_TYPE.code , format(ERROR.INVALID_MQTT_MESSAGE_TYPE, [wireMessage.type]));
            	break;

            default:
            	this._disconnected(ERROR.INVALID_MQTT_MESSAGE_TYPE.code , format(ERROR.INVALID_MQTT_MESSAGE_TYPE, [wireMessage.type]));
        }; 
    };