content/AzIotHubClient.js (243 lines of code) (raw):

const WEB_SOCKET = '/$iothub/websocket?iothub-no-client-cert=true' const DEVICE_TWIN_RES_TOPIC = '$iothub/twin/res/#' const DEVICE_TWIN_GET_TOPIC = '$iothub/twin/GET/?$rid=' const DEVICE_TWIN_PUBLISH_TOPIC = '$iothub/twin/PATCH/properties/reported/?$rid=' const DIRECT_METHOD_TOPIC = '$iothub/methods/POST/#' const DEVICE_TWIN_DESIRED_PROP_RES_TOPIC = '$iothub/twin/PATCH/properties/desired/#' const DIRECT_METHOD_RESPONSE_TOPIC = '$iothub/methods/res/{status}/?$rid=' const DEVICE_C2D_COMMAND_TOPIC = 'devices/{deviceid}/messages/devicebound/#' /** * * @param {String} key * @param {String} msg * @returns {Promise<string>} */ const createHmac = async (key, msg) => { const keyBytes = Uint8Array.from(window.atob(key), c => c.charCodeAt(0)) const msgBytes = Uint8Array.from(msg, c => c.charCodeAt(0)) const cryptoKey = await window.crypto.subtle.importKey( 'raw', keyBytes, { name: 'HMAC', hash: 'SHA-256' }, true, ['sign'] ) const signature = await window.crypto.subtle.sign('HMAC', cryptoKey, msgBytes) return window.btoa(String.fromCharCode(...new Uint8Array(signature))) } /** * @param {string} resourceUri * @param {string} signingKey * @param {string | null} policyName * @param {number} expiresInMins * @returns {Promise<string>} */ async function generateSasToken (resourceUri, signingKey, policyName, expiresInMins) { resourceUri = encodeURIComponent(resourceUri) let expires = (Date.now() / 1000) + expiresInMins * 60 expires = Math.ceil(expires) const toSign = resourceUri + '\n' + expires const hmac = await createHmac(signingKey, toSign) const base64UriEncoded = encodeURIComponent(hmac) let token = 'SharedAccessSignature sr=' + resourceUri + '&sig=' + base64UriEncoded + '&se=' + expires if (policyName) token += '&skn=' + policyName return token } export class AzIoTHubClient { /** * @param {string} host * @param {string} deviceId * @param {string} key * @param {string} [modelId] */ constructor (host, deviceId, key, modelId) { this.connected = false this.host = host this.deviceId = deviceId this.key = key this.modelId = modelId this.rid = 0 this.client = new Paho.MQTT.Client(this.host, Number(443), WEB_SOCKET, this.deviceId) /** * @description Callback when a direct method invocation is received * @param {string} method * @param {string} payload * @param {number} rid */ this.directMethodCallback = (method, payload, rid) => { } /** * @description Callback when a C2D command invocation is received * @param {string} methodName * @param {string} payload */ this.c2dCallback = (methodName, payload) => { } /** * @description Callback for desired properties upadtes * @param {string} desired */ this.desiredPropCallback = (desired) => { } /** * @param {any} err */ this.disconnectCallback = (err) => { console.log(err) } /** * @param {any} twin */ this._onReadTwinCompleted = (twin) => { } this._onUpdateTwinCompleted = () => { } } /** * @description Connects to Azure IoT Hub using MQTT over websockets */ async connect () { let userName = `${this.host}/${this.deviceId}/?api-version=2020-05-31-preview` if (this.modelId) userName += `&model-id=${this.modelId}` const password = await generateSasToken(`${this.host}/devices/${this.deviceId}`, this.key, null, 60) return new Promise((resolve, reject) => { this.client.onConnectionLost = (err) => { console.log(err) this.connected = false this.disconnectCallback(err) reject(err) } const willMsg = new Paho.MQTT.Message('') willMsg.destinationName = 'willMessage' this.client.onMessageArrived = (/** @type {Paho.MQTT.Message} */ m) => { const destinationName = m.destinationName const payloadString = m.payloadString // console.log('On Msg Arrived to ' + destinationName) // console.log(payloadString) if (destinationName === '$iothub/twin/res/200/?$rid=' + this.rid) { this._onReadTwinCompleted(payloadString) } if (destinationName.startsWith('$iothub/twin/res/204/?$rid=' + this.rid)) { this._onUpdateTwinCompleted() } if (destinationName.indexOf('methods/POST') > 1) { const destParts = destinationName.split('/') // $iothub/methods/POST/myCommand/?$rid=2 const methodName = destParts[3] const ridPart = destParts[4] const rid = parseInt(ridPart.split('=')[1]) this.directMethodCallback(methodName, payloadString, rid) } if (destinationName.indexOf('twin/PATCH/properties/desired') > 1) { this.desiredPropCallback(payloadString) } if (destinationName.startsWith(DEVICE_C2D_COMMAND_TOPIC.slice(0, -1).replace('{deviceid}', this.deviceId))) { const url = decodeURIComponent(destinationName) const methodName = url.substring(url.indexOf("&method-name=")+13) this.c2dCallback(methodName, payloadString) } } this.client.connect({ useSSL: true, userName: userName, timeout: 120, cleanSession: true, invocationContext: {}, keepAliveInterval: 120, willMessage: willMsg, password: password, onSuccess: () => { this.connected = true console.log('Connected !!') this.client.subscribe(DEVICE_TWIN_RES_TOPIC, { qos: 0, invocationContext: {}, onSuccess: () => { }, onFailure: (err) => { throw err }, timeout: 120 }) this.client.subscribe(DIRECT_METHOD_TOPIC, { qos: 0, invocationContext: {}, onSuccess: () => { }, onFailure: (err) => { throw err }, timeout: 120 }) this.client.subscribe(DEVICE_TWIN_DESIRED_PROP_RES_TOPIC, { qos: 0, invocationContext: {}, onSuccess: () => { }, onFailure: (err) => { throw err }, timeout: 120 }) this.client.subscribe(DEVICE_C2D_COMMAND_TOPIC.replace('{deviceid}', this.deviceId), { qos: 0, invocationContext: {}, onSuccess: () => { }, onFailure: (err) => { throw err }, timeout: 120 }) resolve(this.deviceId) } }) }) } /** * @return {Promise<DeviceTwin>} */ getTwin () { this.rid = Date.now() // console.log(this.rid) const readTwinMessage = new Paho.MQTT.Message('') readTwinMessage.destinationName = DEVICE_TWIN_GET_TOPIC + this.rid this.client.send(readTwinMessage) return new Promise((resolve, reject) => { /** * @param {string} twin */ this._onReadTwinCompleted = (twin) => { resolve(JSON.parse(twin)) } }) } /** * @param {string} reportedProperties */ updateTwin (reportedProperties) { this.rid = Date.now() // console.log(this.rid) const reportedTwinMessage = new Paho.MQTT.Message(reportedProperties) reportedTwinMessage.destinationName = DEVICE_TWIN_PUBLISH_TOPIC + this.rid this.client.send(reportedTwinMessage) return new Promise((resolve, reject) => { this._onUpdateTwinCompleted = () => { resolve(204) } }) } /** * @param {string} payload */ sendTelemetry (payload) { const telemetryMessage = new Paho.MQTT.Message(payload) telemetryMessage.destinationName = `devices/${this.deviceId}/messages/events/` this.client.send(telemetryMessage) } /** * @param {{ (methodName: string, payload:string, rid:number): void}} directMethodCallback */ setDirectMethodCallback (directMethodCallback) { this.directMethodCallback = directMethodCallback } /** * @param {{ (methodName: string, payload:string, rid:number): void}} c2dCallback */ setCloudToDeviceCallback (c2dCallback) { this.c2dCallback = c2dCallback } /** * @param {string} methodName * @param {string} payload * @param {number} rid * @param {number} status */ commandResponse (methodName, payload, rid, status) { const response = new Paho.MQTT.Message(payload) response.destinationName = DIRECT_METHOD_RESPONSE_TOPIC.replace('{status}', status.toString()) + rid.toString() this.client.send(response) } /** * @param {{ (desired: string): void}} desiredPropCallback */ setDesiredPropertyCallback (desiredPropCallback) { this.desiredPropCallback = desiredPropCallback } } export /** * @param {{ [x: string]: any; }} propValues * @param {number} ac * @param {number} av */ const ackPayload = (propValues, ac, av) => { const isObject = o => o === Object(o) const payload = {} Object.keys(propValues).filter(k => k !== '$version').forEach(k => { const value = propValues[k] if (isObject(value)) { Object.keys(value).filter(k => k !== '__t').forEach(p => { const desiredValue = value[p] propValues[k][p] = { ac, av, value: desiredValue } payload[k] = propValues[k] }) } else { payload[k] = { ac, av, value } } }) return payload }