sdklab/mean_time_recovery/orchestrator.js (106 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
"use strict";
// eslint-disable-next-line security/detect-child-process
const { fork } = require("child_process");
const fs = require("fs");
const debug = require("debug")("orchestrator");
fs.createWriteStream("dataOutput.log"); // this just clears the file essentially...
function wait(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
async function main() {
let childServer;
let childDevice;
function deviceReceiveMessage() {
// This returns a promise to wait on the response by the device, by setting up a
// listener on the childDevice process. The childDevice should send a message to
// the orchestrator when its message has been acked, so once the message is received the promise will be resolved.
return new Promise((res, rej) => {
const callback = (m) => {
debug("in the deviceReceiveMessage callback");
if (m.responseTime) {
res(m);
}
};
childDevice.on("message", callback);
setTimeout(() => { //DevSkim: reviewed DS172411 on 2022-11-30
childDevice.removeListener("message", callback);
rej();
}, 5000);
});
}
function haveDeviceSendTestMessageToBroker(childDevice) {
// this tells the device that it should send a message to the broker, just to make
// sure they're both working. If there is no response from the broker,
// then this should timeout and reject the promise.
return new Promise((res, rej) => {
const callback = (m) => {
debug(`PARENT got message from childDevice: ${JSON.stringify(m)}`);
if (m.messageAckedOnDevice) {
debug("resolving promise");
res();
}
};
childDevice.on("message", callback);
childDevice.send({ sendPingMessage: true });
setTimeout(() => { //DevSkim: reviewed DS172411 on 2022-11-30
childDevice.removeListener("message", callback);
rej();
}, 5000);
});
}
function setupChildServer() {
const childServer = fork("aedes_server.js");
childServer.on("error", (code) => {
debug(`childServer error ${code}`);
});
childServer.on("close", (code) => {
debug(`childServer closed with code ${code}`);
});
return childServer;
}
function setupChildDevice() {
const childDevice = fork("device.js");
childDevice.on("close", (code) => {
debug(`childDevice closed with code ${code}`);
});
childDevice.on("message", (m) => {
debug(`PARENT got message from childDevice: ${JSON.stringify(m)}`);
});
const childDeviceKeepAlive = 20;
debug("setting keepAlive on childDevice to", childDeviceKeepAlive);
childDevice.send({ setKeepAlive: childDeviceKeepAlive }); // set the keepAlive on the Device
return childDevice;
}
childServer = setupChildServer();
await wait(500);
childDevice = setupChildDevice();
await wait(500);
for (let i = 0; i < 5; i++) {
debug("have device send message to server to ensure connection");
await haveDeviceSendTestMessageToBroker(childDevice);
debug("killing childServer");
childServer.kill();
debug("initiate sendEvent on childDevice");
childDevice.send({ sendMessages: 1 });
debug("waiting for 10 seconds");
await wait(10 * 1000); // wait for 10 seconds
debug("restarting server");
const responseTimePromise = deviceReceiveMessage();
childServer = fork("aedes_server.js");
childServer.on("close", (code) => {
debug(`childServer closed with code ${code}`);
});
const timeServerBackOnline = new Date().getTime();
const fulfilled = await responseTimePromise;
debug(fulfilled);
const metrics = {
messageNumber: fulfilled.messageNumber,
responseTime: fulfilled.responseTime - timeServerBackOnline,
};
fs.appendFile("dataOutput.log", JSON.stringify(metrics) + "\n", function (
err
) {
if (err) throw err;
debug("file written!");
});
await wait(5 * 1000); // some time for things to reset between iterations.
}
}
if (typeof require !== "undefined" && require.main === module) {
main();
}