common/transport/amqp/src/amqp_message.ts (147 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. 'use strict'; import { uuid_to_string as rheaUuidToString, string_to_uuid as rheaStringToUuid, types as rheaTypes, message as rheaMessage } from 'rhea'; import { Message } from 'azure-iot-common'; /* tslint:disable:variable-name */ /* tslint:enable:variable-name: [true, "check-format", allow-leading-underscore", "ban-keywords", "allow-snake-case"]*/ function encodeUuid(uuidString: string): any { const uuidRegEx = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/; let uuid; if (typeof uuidString === 'string' && uuidString.match(uuidRegEx)) { // // The rhea library will only serialize the the uuid with an encoding of 0x98 if the uuid property is actually // a 16 byte buffer. // uuid = rheaStringToUuid(uuidString); } else { uuid = uuidString; } return uuid; } /** * @private * @class module:azure-iot-amqp-base.AmqpMessage * @classdesc AMQP-specific message class used to prepare a [azure-iot-common.Message]{@link module:azure-iot-common.Message} * before it's sent over the wire using the AMQP protocol. */ export class AmqpMessage { to?: string; absolute_expiry_time?: Date; message_id?: string; correlation_id?: string; reply_to?: string; content_type?: undefined | 'application/json'; content_encoding?: undefined | 'utf-8' | 'utf-16' | 'utf-32'; body: any; application_properties: { [key: string]: any; }; message_annotations: { [key: string]: any; }; /** * @method module:azure-iot-amqp-base.AmqpMessage.fromMessage * @description Takes a azure-iot-common.Message{@link module:azure-iot-common.Message} object and creates an AMQP message from it. * * @param {module:azure-iot-common.Message} message The {@linkcode Message} object from which to create an AMQP message. */ static fromMessage(message: Message): AmqpMessage { if (!message) throw new ReferenceError('message is \'' + message + '\''); /*Codes_SRS_NODE_IOTHUB_AMQPMSG_05_001: [The fromMessage method shall create a new instance of AmqpMessage.]*/ const amqpMessage = new AmqpMessage(); /*Codes_SRS_NODE_IOTHUB_AMQPMSG_05_003: [If the message argument has a to property, the AmqpMessage object shall have a property named to with the same value.]*/ if (message.to) { amqpMessage.to = message.to; } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_05_004: [If the message argument has an expiryTimeUtc property, the AmqpMessage object shall have a property named absolute_expiry_time with the same value.]*/ if (message.expiryTimeUtc) { amqpMessage.absolute_expiry_time = message.expiryTimeUtc; } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_05_007: [If the message argument has a messageId property, the AmqpMessage object shall have a property named messageId with the same value.]*/ if (message.messageId) { amqpMessage.message_id = encodeUuid(message.messageId); } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_010: [If the `message` argument has a `correlationId` property, the `AmqpMessage` object shall have a property named `correlation_id` with the same value.]*/ if (message.correlationId) { /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_012: [If the `Message.correlationId` property is a UUID, the AMQP type of the `AmqpMessage.correlation_id` property shall be forced to Buffer[16].]*/ amqpMessage.correlation_id = encodeUuid(message.correlationId); } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_014: [If the `message` argument has a `contentEncoding` property, the `AmqpMessage` object shall have a property named `content_encoding` with the same value.]*/ if (message.contentEncoding) { amqpMessage.content_encoding = message.contentEncoding; } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_015: [If the `message` argument has a `contentType` property, the `AmqpMessage` object shall have a property named `content_type` with the same value.]*/ if (message.contentType) { amqpMessage.content_type = message.contentType; } if (message.interfaceId) { if (!amqpMessage.message_annotations) { amqpMessage.message_annotations = { 'iothub-interface-id': message.interfaceId }; } else { amqpMessage.message_annotations['iothub-interface-id'] = message.interfaceId; } } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_05_008: [If needed, the created AmqpMessage object shall have a property of type Object named application_properties.]*/ function ensureApplicationPropertiesCreated(): void { if (!amqpMessage.application_properties) { amqpMessage.application_properties = {}; } } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_05_009: [If the message argument has an ack property, the application_properties property of the AmqpMessage object shall have a property named iothub-ack with the same value.]*/ if (message.ack) { ensureApplicationPropertiesCreated(); amqpMessage.application_properties['iothub-ack'] = message.ack; } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_13_001: [ If message.properties is truthy, then all the properties in it shall be copied to the application_properties property of the AmqpMessage object. ]*/ if (message.properties) { const props = message.properties; const propsCount = props.count(); if (propsCount > 0) { const DT_SUBJECT = 'dt-subject'; for (let index = 0; index < propsCount; index++) { const item = props.getItem(index); if (item) { if (item.key === DT_SUBJECT) { if (!amqpMessage.message_annotations) { amqpMessage.message_annotations = { [DT_SUBJECT]: item.value }; } else { amqpMessage.message_annotations[DT_SUBJECT] = item.value; } } else { if (!amqpMessage.application_properties) { ensureApplicationPropertiesCreated(); } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_013: [If one of the property key is `IoThub-status`, this property is reserved and shall be forced to an `int` `rhea` type.]*/ const val = (item.key === 'IoThub-status') ? rheaTypes.wrap_int(parseInt(item.value)) : item.value; amqpMessage.application_properties[item.key] = val; } } } } } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_05_005: [If message.getData() is truthy, the AmqpMessage object shall have a property named body with the value returned from message.getData().]*/ const body = message.getData(); if (body !== undefined) { amqpMessage.body = rheaMessage.data_section(message.getBytes()); } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_05_006: [The generated AmqpMessage object shall be returned to the caller.]*/ return amqpMessage; } /** * @method module:azure-iot-amqp-base.AmqpMessage.toMessage * @description Creates a transport-agnostic azure-iot-common.Message{@link module:azure-iot-common.Message} object from transport-specific AMQP message. * * @param {AmqpMessage} message The {@linkcode AmqpMessage} object from which to create an Message. */ static toMessage(amqpMessage: AmqpMessage): Message { /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_001: [The `toMessage` method shall throw if the `amqpMessage` argument is falsy.]*/ if (!amqpMessage) { throw new ReferenceError('amqpMessage cannot be \'' + amqpMessage + '\''); } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_009: [The `toMessage` method shall set the `Message.data` of the message to the content of the `AmqpMessage.body.content` property.]*/ const msg: Message = ( amqpMessage.body ) ? ( new Message(amqpMessage.body.content) ) : ( new Message(undefined) ); /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_005: [The `toMessage` method shall set the `Message.to` property to the `AmqpMessage.to` value if it is present.]*/ if (amqpMessage.to) { msg.to = amqpMessage.to; } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_006: [The `toMessage` method shall set the `Message.expiryTimeUtc` property to the `AmqpMessage.absolute_expiry_time` value if it is present.]*/ if (amqpMessage.absolute_expiry_time) { msg.expiryTimeUtc = amqpMessage.absolute_expiry_time; } // // The rhea library will de-serialize an encoded uuid (0x98) as a 16 byte buffer. // Since common messages should only have type string it is safe to decode // these as strings. // /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_004: [The `toMessage` method shall set the `Message.messageId` property to the `AmqpMessage.message_id` value if it is present.]*/ if (amqpMessage.message_id) { if (((amqpMessage.message_id as any) instanceof Buffer) && (amqpMessage.message_id.length === 16)) { msg.messageId = rheaUuidToString(amqpMessage.message_id as any); } else { msg.messageId = amqpMessage.message_id; } } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_003: [The `toMessage` method shall set the `Message.correlationId` property to the `AmqpMessage.correlation_id` value if it is present.]*/ if (amqpMessage.correlation_id) { if (((amqpMessage.correlation_id as any) instanceof Buffer) && (amqpMessage.correlation_id.length === 16)) { msg.correlationId = rheaUuidToString(amqpMessage.correlation_id as any); } else { msg.correlationId = amqpMessage.correlation_id; } } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_016: [The `toMessage` method shall set the `Message.contentType` property to the `AmqpMessage.content_type` value if it is present. ]*/ if (amqpMessage.content_type) { msg.contentType = amqpMessage.content_type; } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_017: [The `toMessage` method shall set the `Message.contentEncoding` property to the `AmqpMessage.content_encoding` value if it is present. ]*/ if (amqpMessage.content_encoding) { msg.contentEncoding = amqpMessage.content_encoding; } /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_007: [The `toMessage` method shall convert the user-defined `AmqpMessage.applicationProperties` to a `Properties` collection stored in `Message.properties`.]*/ if (amqpMessage.application_properties) { const appProps = amqpMessage.application_properties; for (const key in appProps) { if (Object.prototype.hasOwnProperty.call(appProps, key)) { /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_008: [The `toMessage` method shall set the `Message.ack` property to the `AmqpMessage.application_properties['iothub-ack']` value if it is present.]*/ if (key === 'iothub-ack') { msg.ack = appProps[key]; } else { msg.properties.add(key, appProps[key]); } } } } msg.transportObj = amqpMessage; /*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_002: [The `toMessage` method shall return a `Message` object.]*/ return msg; } }