in source/lambda/filter-kinesis-stream/machine-data-parser.ts [42:103]
public parseData(base64EncodedData: string): IParsedMachineData {
let decodedData: any;
try {
decodedData = JSON.parse(Buffer.from(base64EncodedData, 'base64').toString());
} catch (err) {
// Log the specific error
console.error(err);
throw new Error('Unable to decode data');
}
const matchingMessageFormat = this.getMatchingMessageFormatConfigItem(decodedData);
if (!matchingMessageFormat) {
throw new Error('Unable to parse the record. Did not find a matching message format configuration');
}
const output: IParsedMachineData = { messages: [] };
// Process each message in the data payload
(decodedData[matchingMessageFormat.msgFormatDataMessagesKeyName] as any[]).forEach(msg => {
const timestamp = moment(msg[matchingMessageFormat.msgFormatDataMessageTimestampKeyName], matchingMessageFormat.msgFormatDataMessageTimestampFormat, true);
const splitAlias = (msg[matchingMessageFormat.msgFormatDataMessageAliasKeyName] as string).split(matchingMessageFormat.msgFormatDataAliasDelimiter);
// The final portion of the alias is the attribute
const attributeName = splitAlias.pop();
// Join the rest of the alias back together and use as a unique machine ID
const machineId = splitAlias.join(matchingMessageFormat.msgFormatDataAliasDelimiter);
const machineConfig = this.machineConfigs[machineId];
if (!machineConfig) {
if (this._verboseLogging) {
console.log(`Did not find a machine configuration for ${machineId}`);
}
}
const machineDataMsg: IMachineDataMessage = {
timestamp: timestamp.unix(),
attributeName,
machineId,
isStatusMsg: machineConfig ? (machineConfig.machineStatusTagName === attributeName) : false,
isProductionCountMsg: machineConfig ? (machineConfig.machineProductionCountTagName === attributeName) : false,
value: msg[matchingMessageFormat.msgFormatDataMessageValueKeyName]
};
// Multiple values can be configured as representing machine UP/DOWN/IDLE.
// The dashboard will direct users to use a comma-separated list when
// setting multiple values.
if (machineDataMsg.isStatusMsg) {
if (machineConfig.machineStatusUpValue.split(',').map(item => item.trim()).includes(`${machineDataMsg.value}`)) {
machineDataMsg.machineStatus = MachineStatus.UP;
} else if (machineConfig.machineStatusDownValue.split(',').map(item => item.trim()).includes(`${machineDataMsg.value}`)) {
machineDataMsg.machineStatus = MachineStatus.DOWN;
} else if (machineConfig.machineStatusIdleValue.split(',').map(item => item.trim()).includes(`${machineDataMsg.value}`)) {
machineDataMsg.machineStatus = MachineStatus.IDLE;
}
}
output.messages.push(machineDataMsg);
});
return output;
}