in activemq-web-demo/src/main/webapp/mqtt/mqttws31.js [249:411]
WireMessage.prototype.encode = function() {
// Compute the first byte of the fixed header
var first = ((this.type & 0x0f) << 4);
/*
* Now calculate the length of the variable header + payload by adding up the lengths
* of all the component parts
*/
remLength = 0;
topicStrLength = new Array();
// if the message contains a messageIdentifier then we need two bytes for that
if (this.messageIdentifier != undefined)
remLength += 2;
switch(this.type) {
// If this a Connect then we need to include 12 bytes for its header
case MESSAGE_TYPE.CONNECT:
remLength += MqttProtoIdentifier.length + 3;
remLength += UTF8Length(this.clientId) + 2;
if (this.willMessage != undefined) {
remLength += UTF8Length(this.willMessage.destinationName) + 2;
// Will message is always a string, sent as UTF-8 characters with a preceding length.
var willMessagePayloadBytes = this.willMessage.payloadBytes;
if (!(willMessagePayloadBytes instanceof Uint8Array))
willMessagePayloadBytes = new Uint8Array(payloadBytes);
remLength += willMessagePayloadBytes.byteLength +2;
}
if (this.userName != undefined)
remLength += UTF8Length(this.userName) + 2;
if (this.password != undefined)
remLength += UTF8Length(this.password) + 2;
break;
// Subscribe, Unsubscribe can both contain topic strings
case MESSAGE_TYPE.SUBSCRIBE:
first |= 0x02; // Qos = 1;
for ( var i = 0; i < this.topics.length; i++) {
topicStrLength[i] = UTF8Length(this.topics[i]);
remLength += topicStrLength[i] + 2;
}
remLength += this.requestedQos.length; // 1 byte for each topic's Qos
// QoS on Subscribe only
break;
case MESSAGE_TYPE.UNSUBSCRIBE:
first |= 0x02; // Qos = 1;
for ( var i = 0; i < this.topics.length; i++) {
topicStrLength[i] = UTF8Length(this.topics[i]);
remLength += topicStrLength[i] + 2;
}
break;
case MESSAGE_TYPE.PUBLISH:
if (this.payloadMessage.duplicate) first |= 0x08;
first = first |= (this.payloadMessage.qos << 1);
if (this.payloadMessage.retained) first |= 0x01;
destinationNameLength = UTF8Length(this.payloadMessage.destinationName);
remLength += destinationNameLength + 2;
var payloadBytes = this.payloadMessage.payloadBytes;
remLength += payloadBytes.byteLength;
if (payloadBytes instanceof ArrayBuffer)
payloadBytes = new Uint8Array(payloadBytes);
else if (!(payloadBytes instanceof Uint8Array))
payloadBytes = new Uint8Array(payloadBytes.buffer);
break;
case MESSAGE_TYPE.DISCONNECT:
break;
default:
;
}
// Now we can allocate a buffer for the message
var mbi = encodeMBI(remLength); // Convert the length to MQTT MBI format
var pos = mbi.length + 1; // Offset of start of variable header
var buffer = new ArrayBuffer(remLength + pos);
var byteStream = new Uint8Array(buffer); // view it as a sequence of bytes
//Write the fixed header into the buffer
byteStream[0] = first;
byteStream.set(mbi,1);
// If this is a PUBLISH then the variable header starts with a topic
if (this.type == MESSAGE_TYPE.PUBLISH)
pos = writeString(this.payloadMessage.destinationName, destinationNameLength, byteStream, pos);
// If this is a CONNECT then the variable header contains the protocol name/version, flags and keepalive time
else if (this.type == MESSAGE_TYPE.CONNECT) {
byteStream.set(MqttProtoIdentifier, pos);
pos += MqttProtoIdentifier.length;
var connectFlags = 0;
if (this.cleanSession)
connectFlags = 0x02;
if (this.willMessage != undefined ) {
connectFlags |= 0x04;
connectFlags |= (this.willMessage.qos<<3);
if (this.willMessage.retained) {
connectFlags |= 0x20;
}
}
if (this.userName != undefined)
connectFlags |= 0x80;
if (this.password != undefined)
connectFlags |= 0x40;
byteStream[pos++] = connectFlags;
pos = writeUint16 (this.keepAliveInterval, byteStream, pos);
}
// Output the messageIdentifier - if there is one
if (this.messageIdentifier != undefined)
pos = writeUint16 (this.messageIdentifier, byteStream, pos);
switch(this.type) {
case MESSAGE_TYPE.CONNECT:
pos = writeString(this.clientId, UTF8Length(this.clientId), byteStream, pos);
if (this.willMessage != undefined) {
pos = writeString(this.willMessage.destinationName, UTF8Length(this.willMessage.destinationName), byteStream, pos);
pos = writeUint16(willMessagePayloadBytes.byteLength, byteStream, pos);
byteStream.set(willMessagePayloadBytes, pos);
pos += willMessagePayloadBytes.byteLength;
}
if (this.userName != undefined)
pos = writeString(this.userName, UTF8Length(this.userName), byteStream, pos);
if (this.password != undefined)
pos = writeString(this.password, UTF8Length(this.password), byteStream, pos);
break;
case MESSAGE_TYPE.PUBLISH:
// PUBLISH has a text or binary payload, if text do not add a 2 byte length field, just the UTF characters.
byteStream.set(payloadBytes, pos);
break;
// case MESSAGE_TYPE.PUBREC:
// case MESSAGE_TYPE.PUBREL:
// case MESSAGE_TYPE.PUBCOMP:
// break;
case MESSAGE_TYPE.SUBSCRIBE:
// SUBSCRIBE has a list of topic strings and request QoS
for (var i=0; i<this.topics.length; i++) {
pos = writeString(this.topics[i], topicStrLength[i], byteStream, pos);
byteStream[pos++] = this.requestedQos[i];
}
break;
case MESSAGE_TYPE.UNSUBSCRIBE:
// UNSUBSCRIBE has a list of topic strings
for (var i=0; i<this.topics.length; i++)
pos = writeString(this.topics[i], topicStrLength[i], byteStream, pos);
break;
default:
// Do nothing.
}
return buffer;
}