in source/lambda/greengrass-deployer/index.ts [83:207]
async function deployConnection(connectionDefinition: ConnectionBuilderTypes.ConnectionDefinition): Promise<void> {
logger.log(LoggingLevel.DEBUG, `Deploying a connection: ${JSON.stringify(connectionDefinition, null, 2)}`);
let collectorLambdaFunctionName: string;
let publisherLambdaFunctionName: string;
let collectorLambdaFunctionAliasArn: string;
let publisherLambdaFunctionAliasArn: string;
try {
const { connectionName, area, machineName, process, protocol, sendDataToIoTSitewise, sendDataToIoTTopic, sendDataToKinesisDataStreams, siteName } = connectionDefinition;
const greengrassHandler = new GreengrassHandler({
connectionName,
area,
machineName,
process,
protocol,
sendDataToIoTSitewise,
sendDataToIoTTopic,
sendDataToKinesisDataStreams,
siteName
});
const greengrassGroupVersion = await greengrassHandler.getGreengrassGroupLatestVersion();
if (protocol === ConnectionBuilderTypes.MachineProtocol.OPCDA) {
/**
* When the protocol is OPC DA,
* create a collector Lambda function for the connection.
*/
const collectorLambdaFunction = await lambdaHalder.createLambdaFunction({
environmentVariables: {
AREA: area,
CONNECTION_GG_STREAM_NAME: `m2c2_${connectionName}_stream`,
MACHINE_NAME: machineName,
PROCESS: process,
SITE_NAME: siteName
},
functionType: LambdaHandlerTypes.LambdaFunctionType.COLLECTOR,
connectionName,
protocol
});
collectorLambdaFunctionName = collectorLambdaFunction.FunctionName;
const collectorLambdaFunctionAlias = await lambdaHalder.createFunctionAlias(collectorLambdaFunctionName, collectorLambdaFunction.FunctionArn);
collectorLambdaFunctionAliasArn = collectorLambdaFunctionAlias.AliasArn;
} else if (protocol === ConnectionBuilderTypes.MachineProtocol.OPCUA) {
/**
* When the protocol is OPC UA,
* create a source configuration in the IoT Sitewise gateway capability configuration.
*/
await iotSitewiseHandler.addGatwayCapabilityConfigurationSource({
connectionName,
serverName: connectionDefinition.opcUa.serverName,
machineIp: connectionDefinition.opcUa.machineIp,
port: connectionDefinition.opcUa.port
});
}
const publisherLambdaFunction = await lambdaHalder.createLambdaFunction({
environmentVariables: {
AREA: area,
CONNECTION_GG_STREAM_NAME: `m2c2_${connectionName}_stream`,
CONNECTION_NAME: connectionName,
KINESIS_STREAM_NAME: KINESIS_STREAM,
MACHINE_NAME: machineName,
PROCESS: process,
PROTOCOL: protocol,
SEND_TO_IOT_TOPIC: sendDataToIoTTopic ? 'Yes' : undefined,
SEND_TO_KINESIS_STREAM: sendDataToKinesisDataStreams ? 'Yes' : undefined,
SEND_TO_SITEWISE: sendDataToIoTSitewise ? 'Yes' : undefined,
SITE_NAME: siteName
},
functionType: LambdaHandlerTypes.LambdaFunctionType.PUBLISHER,
connectionName,
protocol
});
publisherLambdaFunctionName = publisherLambdaFunction.FunctionName;
const publisherLambdaFunctionAlias = await lambdaHalder.createFunctionAlias(publisherLambdaFunctionName, publisherLambdaFunction.FunctionArn);
publisherLambdaFunctionAliasArn = publisherLambdaFunctionAlias.AliasArn;
const newDefinitionVersionArns = await greengrassHandler.updateGreengrassGroupDefinitions({
greengrassGroupVersion,
publisherLambdaFunctionAliasArn,
collectorLambdaFunctionAliasArn
});
const response = await greengrassHandler.createGreengrassGroupVersion(newDefinitionVersionArns);
const newConnection = await dynamoDbHandler.addConnection(connectionDefinition);
const connectionsToRestart = await stopRunningConnections();
if (newConnection.protocol === ConnectionBuilderTypes.MachineProtocol.OPCDA) {
connectionsToRestart.push(newConnection);
}
await greengrassHandler.createGreengrassDeployment(response.Version);
await startConnections(connectionsToRestart);
if (newConnection.protocol !== ConnectionBuilderTypes.MachineProtocol.OPCDA) {
await dynamoDbHandler.updateConnection({
connectionName: newConnection.connectionName,
control: ConnectionBuilderTypes.ConnectionControl.START
});
}
} catch (error) {
logger.log(LoggingLevel.ERROR, 'An error occurred while deploying a connection to the Greengrass group. Error: ', error);
let errorMessage = 'An error occurred while deploying a connection to the Greengrass group.';
if (error instanceof LambdaError) {
errorMessage = error.message;
}
logger.log(LoggingLevel.ERROR, 'Trying to roll back...');
await iotHandler.publishIoTTopicMessage(connectionDefinition.connectionName, IoTHandlerTypes.IotMessageTypes.ERROR, {
error: errorMessage
});
await deleteConnection(connectionDefinition.connectionName, connectionDefinition.protocol);
throw new LambdaError({
message: 'An error occurred while deploying a connection to the Greengrass group.',
name: 'GreengrassDeployerError'
});
}
}