in activemq-web-demo/src/main/webapp/mqtt/mqttws31.js [1077:1235]
ClientImpl.prototype._on_socket_message = function (event) {
this._trace("Client._on_socket_message", event.data);
// Reset the receive ping timer, we now have evidence the server is alive.
this.receivePinger.reset();
var byteArray = new Uint8Array(event.data);
try {
var wireMessage = decodeMessage(byteArray);
} catch (error) {
this._disconnected(ERROR.INTERNAL_ERROR.code , format(ERROR.INTERNAL_ERROR, [error.message]));
return;
}
this._trace("Client._on_socket_message", wireMessage);
switch(wireMessage.type) {
case MESSAGE_TYPE.CONNACK:
this._connectTimeout.cancel();
// If we have started using clean session then clear up the local state.
if (this.connectOptions.cleanSession) {
for (key in this._sentMessages) {
var sentMessage = this._sentMessages[key];
localStorage.removeItem("Sent:"+this._localKey+sentMessage.messageIdentifier);
}
this._sentMessages = {};
for (key in this._receivedMessages) {
var receivedMessage = this._receivedMessages[key];
localStorage.removeItem("Received:"+this._localKey+receivedMessage.messageIdentifier);
}
this._receivedMessages = {};
}
// Client connected and ready for business.
if (wireMessage.returnCode === 0) {
this.connected = true;
// Jump to the end of the list of hosts and stop looking for a good host.
if (this.connectOptions.hosts)
this.hostIndex = this.connectOptions.hosts.length;
} else {
this._disconnected(ERROR.CONNACK_RETURNCODE.code , format(ERROR.CONNACK_RETURNCODE, [wireMessage.returnCode, CONNACK_RC[wireMessage.returnCode]]));
break;
}
// Resend messages.
var sequencedMessages = new Array();
for (var msgId in this._sentMessages) {
if (this._sentMessages.hasOwnProperty(msgId))
sequencedMessages.push(this._sentMessages[msgId]);
}
// Sort sentMessages into the original sent order.
var sequencedMessages = sequencedMessages.sort(function(a,b) {return a.sequence - b.sequence;} );
for (var i=0, len=sequencedMessages.length; i<len; i++) {
var sentMessage = sequencedMessages[i];
if (sentMessage.type == MESSAGE_TYPE.PUBLISH && sentMessage.pubRecReceived) {
var pubRelMessage = new WireMessage(MESSAGE_TYPE.PUBREL, {messageIdentifier:sentMessage.messageIdentifier});
this._schedule_message(pubRelMessage);
} else {
this._schedule_message(sentMessage);
};
}
// Execute the connectOptions.onSuccess callback if there is one.
if (this.connectOptions.onSuccess) {
this.connectOptions.onSuccess({invocationContext:this.connectOptions.invocationContext});
}
// Process all queued messages now that the connection is established.
this._process_queue();
break;
case MESSAGE_TYPE.PUBLISH:
this._receivePublish(wireMessage);
break;
case MESSAGE_TYPE.PUBACK:
var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
// If this is a re flow of a PUBACK after we have restarted receivedMessage will not exist.
if (sentMessage) {
delete this._sentMessages[wireMessage.messageIdentifier];
localStorage.removeItem("Sent:"+this._localKey+wireMessage.messageIdentifier);
if (this.onMessageDelivered)
this.onMessageDelivered(sentMessage.payloadMessage);
}
break;
case MESSAGE_TYPE.PUBREC:
var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
// If this is a re flow of a PUBREC after we have restarted receivedMessage will not exist.
if (sentMessage) {
sentMessage.pubRecReceived = true;
var pubRelMessage = new WireMessage(MESSAGE_TYPE.PUBREL, {messageIdentifier:wireMessage.messageIdentifier});
this.store("Sent:", sentMessage);
this._schedule_message(pubRelMessage);
}
break;
case MESSAGE_TYPE.PUBREL:
var receivedMessage = this._receivedMessages[wireMessage.messageIdentifier];
localStorage.removeItem("Received:"+this._localKey+wireMessage.messageIdentifier);
// If this is a re flow of a PUBREL after we have restarted receivedMessage will not exist.
if (receivedMessage) {
this._receiveMessage(receivedMessage);
delete this._receivedMessages[wireMessage.messageIdentifier];
}
// Always flow PubComp, we may have previously flowed PubComp but the server lost it and restarted.
pubCompMessage = new WireMessage(MESSAGE_TYPE.PUBCOMP, {messageIdentifier:wireMessage.messageIdentifier});
this._schedule_message(pubCompMessage);
break;
case MESSAGE_TYPE.PUBCOMP:
var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
delete this._sentMessages[wireMessage.messageIdentifier];
localStorage.removeItem("Sent:"+this._localKey+wireMessage.messageIdentifier);
if (this.onMessageDelivered)
this.onMessageDelivered(sentMessage.payloadMessage);
break;
case MESSAGE_TYPE.SUBACK:
var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
if (sentMessage) {
if(sentMessage.timeOut)
sentMessage.timeOut.cancel();
if (sentMessage.callback) {
sentMessage.callback();
}
delete this._sentMessages[wireMessage.messageIdentifier];
}
break;
case MESSAGE_TYPE.UNSUBACK:
var sentMessage = this._sentMessages[wireMessage.messageIdentifier];
if (sentMessage) {
if (sentMessage.timeOut)
sentMessage.timeOut.cancel();
if (sentMessage.callback) {
sentMessage.callback();
}
delete this._sentMessages[wireMessage.messageIdentifier];
}
break;
case MESSAGE_TYPE.PINGRESP:
/* The sendPinger or receivePinger may have sent a ping, the receivePinger has already been reset. */
this.sendPinger.reset();
break;
case MESSAGE_TYPE.DISCONNECT:
// Clients do not expect to receive disconnect packets.
this._disconnected(ERROR.INVALID_MQTT_MESSAGE_TYPE.code , format(ERROR.INVALID_MQTT_MESSAGE_TYPE, [wireMessage.type]));
break;
default:
this._disconnected(ERROR.INVALID_MQTT_MESSAGE_TYPE.code , format(ERROR.INVALID_MQTT_MESSAGE_TYPE, [wireMessage.type]));
};
};