in source/resources/helper/lib/kinesis-helper.js [41:147]
kinesisHelper.prototype.createKinesisAnalyticsApp = function(settings, cb) {
let _params = {
ApplicationName: settings.appName,
ApplicationDescription: 'This Amazon Kinesis Analaytics application detects anomalous oil temperatures for the connected vehicle platform',
Inputs: [{
InputSchema: {
RecordColumns: [{
Name: 'ts',
SqlType: 'TIMESTAMP',
Mapping: '$.timestamp'
}, {
Name: 'trip_id',
SqlType: 'VARCHAR(64)',
Mapping: '$.trip_id'
}, {
Name: 'vin',
SqlType: 'VARCHAR(32)',
Mapping: '$.vin'
}, {
Name: 'name',
SqlType: 'VARCHAR(32)',
Mapping: '$.name'
}, {
Name: 'val',
SqlType: 'DOUBLE',
Mapping: '$.value'
}, {
Name: 'latitude',
SqlType: 'DOUBLE',
Mapping: '$.latitude'
}, {
Name: 'longitude',
SqlType: 'DOUBLE',
Mapping: '$.longitude'
}],
RecordFormat: {
RecordFormatType: 'JSON',
MappingParameters: {
JSONMappingParameters: {
RecordRowPath: '$'
}
}
},
RecordEncoding: 'UTF-8'
},
NamePrefix: 'SOURCE_SQL_STREAM',
KinesisFirehoseInput: {
ResourceARN: settings.deliveryStream,
RoleARN: settings.roleArn
}
}],
Outputs: [{
DestinationSchema: {
RecordFormatType: 'JSON'
},
Name: 'ANOMALY_OUTPUT_STREAM',
KinesisStreamsOutput: {
ResourceARN: settings.anomalyStream,
RoleARN: settings.roleArn
}
}],
ApplicationCode: 'CREATE OR REPLACE STREAM "TEMP_STREAM" ("ts" TIMESTAMP,"oil_temp" DOUBLE,"trip_id" VARCHAR(64),"vin" VARCHAR(32),"ANOMALY_SCORE" DOUBLE);\r\n\
CREATE OR REPLACE STREAM "ANOMALY_STREAM" ("ts" TIMESTAMP,"oil_temp" DOUBLE,"trip_id" VARCHAR(64),"vin" VARCHAR(32),"ANOMALY_SCORE" DOUBLE);\r\n\
CREATE OR REPLACE STREAM "ANOMALY_OUTPUT_STREAM" ("ts" TIMESTAMP,"value" DOUBLE,"trip_id" VARCHAR(64),"vin" VARCHAR(32),"ANOMALY_SCORE" DOUBLE, "telemetric" VARCHAR(32),"low_limit" INT);\r\n\
-- Option 1 - Compute an anomaly score for each oil temperature record in the input stream using unsupervised machine learning algorithm, Random Cut Forest\r\n\
--CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "ts","val", "trip_id", "vin", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST(CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001" WHERE "name" = \'oil_temp\' AND "val" > 240),10,10,10,1));\r\n\
-- Option 2 - Compute an anomaly score for each oil temperaure record in the input stream, where the anomaly is a simple diff between the observed oil temperature and a predefined average\r\n\
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "ts","val", "trip_id", "vin", ("val"-250) as ANOMALY_SCORE FROM "SOURCE_SQL_STREAM_001" WHERE "name" = \'oil_temp\';\r\n\
CREATE OR REPLACE PUMP "ANOMALY_STREAM_PUMP" AS INSERT INTO "ANOMALY_STREAM" SELECT STREAM * FROM "TEMP_STREAM";\r\n\
CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "ANOMALY_OUTPUT_STREAM" SELECT STREAM *,\'oil_temp\' as telemetric, 250 as low_limit FROM "TEMP_STREAM" WHERE ANOMALY_SCORE > 30;\r\n'
};
let kinesisAnalytics = new AWS.KinesisAnalytics();
kinesisAnalytics.createApplication(_params, function(err, data) {
if (err) {
console.log(err);
console.log('Could not create Amazon Kinesis Analytics application');
return cb(err, null);
} else {
console.log('Amazon Kinesis Analytics application was successfully created');
kinesisAnalytics.describeApplication({
ApplicationName: settings.appName
}, function(err, appData) {
if (err) {
console.log(err);
console.log('Could not start Amazon Kinesis Analytics application');
return cb(err, null);
} else {
console.log('Found Amazon Kinesis Analytics application input Id');
let appInputId = appData.ApplicationDetail.InputDescriptions[0].InputId;
startKinesisAnalyticsApp({
appName: settings.appName,
appInputId: appInputId
}, function(err, startData) {
if (err) {
return cb(err, null);
}
return cb(null, startData);
});
}
});
}
});
};