in dynamodb-api/src/main/java/com/awssamples/dynamodbapi/SqsToIotCoreStack.java [148:252]
public SqsToIotCoreStack(final Construct parent, final String name) {
super(parent, name);
// Build all of the necessary JARs
build();
Map<String, String> arguments = CdkHelper.getArguments();
// Get the queue ARN from the arguments or build the message queue and extract the ARN
String inboundSqsQueueArn = getInboundQueueArn()
.getOrElse(() -> buildInboundMessageQueue().getQueueArn());
String outboundSqsQueueArn = getOutboundQueueArn()
.getOrElse(() -> buildOutboundMessageQueue().getQueueArn());
Table messageTable = buildMessageTable();
String uuidName = getUuid()
.getOrElse(this::getDefaultUuidNameAndWarn);
String messageIdName = getMessageId()
.getOrElse(this::getDefaultMessageNameIdAndWarn);
// Resources to move messages from SQS to DynamoDB
Role moveFromSqsToDynamoDbRole = buildMoveFromSqsToDynamoDbRole(inboundSqsQueueArn, messageTable);
Function moveFromSqsToDynamoDb = buildMoveFromSqsToDynamoDbLambda(inboundSqsQueueArn, uuidName, messageIdName, messageTable, moveFromSqsToDynamoDbRole);
buildEventSourceMapping(inboundSqsQueueArn, moveFromSqsToDynamoDb);
// Default environment for all functions just contains the table name
Map<String, String> defaultEnvironment = getDefaultEnvironment(messageTable);
// Resources to get messages from DynamoDB and publish to IoT Core
List<PolicyStatement> getItemPolicyStatementsForMessageTable = List.of(getGetItemPolicyStatementForTable(messageTable));
Role getRole = buildIotEventRoleForTopicPrefix(GET_ROLE_NAME, GET_RESPONSE_TOPIC_PREFIX, getItemPolicyStatementsForMessageTable, List.empty());
String getRequestTopicFilter = String.join(MQTT_TOPIC_SEPARATOR, GET_REQUEST_TOPIC_PREFIX, MULTI_LEVEL_MQTT_WILDCARD);
Map<String, String> getMessageLambdaEnvironment = getGetMessageLambdaEnvironment();
Function getLambda = buildIotEventLambda(GET_LAMBDA_FUNCTION_NAME, getRole, defaultEnvironment, getMessageLambdaEnvironment, GET_METHOD_NAME);
CfnTopicRule getRule = buildIotEventRule(this, GET_RULE_NAME, getLambda, DEFAULT_SELECT_CLAUSE, getRequestTopicFilter);
allowIotTopicRuleToInvokeLambdaFunction(this, getRule, getLambda, GET_LAMBDA_PERMISSIONS);
// Resources to query for available messages from DynamoDB and publish to IoT Core
List<PolicyStatement> queryPolicyStatementsForMessageTable = List.of(getQueryPolicyStatementForTable(messageTable));
Role queryRole = buildIotEventRoleForTopicPrefix(QUERY_ROLE_NAME, QUERY_RESPONSE_TOPIC_PREFIX, queryPolicyStatementsForMessageTable, List.empty());
String queryRequestTopicFilter = String.join(MQTT_TOPIC_SEPARATOR, QUERY_REQUEST_TOPIC_PREFIX, MULTI_LEVEL_MQTT_WILDCARD);
Map<String, String> queryMessageLambdaEnvironment = getQueryMessageLambdaEnvironment();
Function queryLambda = buildIotEventLambda(QUERY_LAMBDA_FUNCTION_NAME, queryRole, defaultEnvironment, queryMessageLambdaEnvironment, QUERY_METHOD_NAME);
CfnTopicRule queryRule = buildIotEventRule(this, QUERY_RULE_NAME, queryLambda, DEFAULT_SELECT_CLAUSE, queryRequestTopicFilter);
allowIotTopicRuleToInvokeLambdaFunction(this, queryRule, queryLambda, QUERY_LAMBDA_PERMISSIONS);
// Resources to delete messages from DynamoD0
List<PolicyStatement> deleteItemPolicyStatementsForMessageTable = List.of(getDeleteItemPolicyStatementForTable(messageTable));
Role deleteRole = buildIotEventRoleForTopicPrefix(DELETE_ROLE_NAME, DELETE_RESPONSE_TOPIC_PREFIX, deleteItemPolicyStatementsForMessageTable, List.empty());
String deleteRequestTopicFilter = String.join(MQTT_TOPIC_SEPARATOR, DELETE_REQUEST_TOPIC_PREFIX, MULTI_LEVEL_MQTT_WILDCARD);
Map<String, String> deleteMessageLambdaEnvironment = getDeleteMessageLambdaEnvironment();
Function deleteLambda = buildIotEventLambda(DELETE_LAMBDA_FUNCTION_NAME, deleteRole, defaultEnvironment, deleteMessageLambdaEnvironment, DELETE_METHOD_NAME);
CfnTopicRule deleteRule = buildIotEventRule(this, DELETE_RULE_NAME, deleteLambda, DEFAULT_SELECT_CLAUSE, deleteRequestTopicFilter);
allowIotTopicRuleToInvokeLambdaFunction(this, deleteRule, deleteLambda, DELETE_LAMBDA_PERMISSIONS);
// Resources to get the next message ID from DynamoDB
List<PolicyStatement> nextItemPolicyStatementsForMessageTable = List.of(getNextItemPolicyStatementForTable(messageTable));
Role nextRole = buildIotEventRoleForTopicPrefix(NEXT_ROLE_NAME, NEXT_RESPONSE_TOPIC_PREFIX, nextItemPolicyStatementsForMessageTable, List.empty());
String nextRequestTopicFilter = String.join(MQTT_TOPIC_SEPARATOR, NEXT_REQUEST_TOPIC_PREFIX, MULTI_LEVEL_MQTT_WILDCARD);
Map<String, String> nextMessageLambdaEnvironment = getNextMessageLambdaEnvironment();
Function nextLambda = buildIotEventLambda(NEXT_LAMBDA_FUNCTION_NAME, nextRole, defaultEnvironment, nextMessageLambdaEnvironment, NEXT_METHOD_NAME);
CfnTopicRule nextRule = buildIotEventRule(this, NEXT_RULE_NAME, nextLambda, DEFAULT_SELECT_CLAUSE, nextRequestTopicFilter);
allowIotTopicRuleToInvokeLambdaFunction(this, nextRule, nextLambda, NEXT_LAMBDA_PERMISSIONS);
// Resources to get the list of devices with unread messages from DynamoDB
Map<String, String> devicesMessageLambdaEnvironment = getDevicesMessageLambdaEnvironment();
Function devicesLambda;
if (dynamoDbDeviceLookup) {
List<PolicyStatement> devicesDynamoDbPolicyStatementsForMessageTable = List.of(getDevicesPolicyStatementForTable(messageTable));
Role dynamoDbDevicesRole = buildIotEventRoleForTopic(DEVICES_ROLE_NAME, DEVICES_RESPONSE_TOPIC, devicesDynamoDbPolicyStatementsForMessageTable, List.empty());
Function dynamoDbDevicesLambda = buildIotEventLambda(DEVICES_LAMBDA_FUNCTION_NAME, dynamoDbDevicesRole, defaultEnvironment, devicesMessageLambdaEnvironment, DEVICES_DYNAMODB_METHOD_NAME);
devicesLambda = dynamoDbDevicesLambda;
} else {
List<PolicyStatement> devicesRegistryPolicyStatements = List.of(searchIndexPolicyStatement);
Role registryDevicesRole = buildIotEventRoleForTopic(DEVICES_ROLE_NAME, DEVICES_RESPONSE_TOPIC, devicesRegistryPolicyStatements, List.empty());
Function registryDevicesLambda = buildIotEventLambda(DEVICES_LAMBDA_FUNCTION_NAME, registryDevicesRole, defaultEnvironment, devicesMessageLambdaEnvironment, DEVICES_REGISTRY_METHOD_NAME);
devicesLambda = registryDevicesLambda;
}
CfnTopicRule devicesRule = buildIotEventRule(this, DEVICES_RULE_NAME, devicesLambda, DEFAULT_SELECT_CLAUSE, DEVICES_REQUEST_TOPIC);
allowIotTopicRuleToInvokeLambdaFunction(this, devicesRule, devicesLambda, DEVICES_LAMBDA_PERMISSIONS);
// Resources to send messages to SQS
List<PolicyStatement> sendToSqsPolicyStatements = List.of(getSqsSendMessagePolicyStatement(outboundSqsQueueArn));
Role sendRole = buildIotEventRoleForTopicPrefix(SEND_ROLE_NAME, SEND_RESPONSE_TOPIC_PREFIX, sendToSqsPolicyStatements, List.empty());
String sendRequestTopicFilter = String.join(MQTT_TOPIC_SEPARATOR, SEND_REQUEST_TOPIC_PREFIX, MULTI_LEVEL_MQTT_WILDCARD);
Map<String, String> sendMessageLambdaEnvironment = getSendMessageLambdaEnvironment(outboundSqsQueueArn);
Function sendLambda = buildIotEventLambda(SEND_LAMBDA_FUNCTION_NAME, sendRole, defaultEnvironment, sendMessageLambdaEnvironment, SEND_METHOD_NAME);
CfnTopicRule sendRule = buildIotEventRule(this, SEND_RULE_NAME, sendLambda, DEFAULT_SELECT_CLAUSE, sendRequestTopicFilter);
allowIotTopicRuleToInvokeLambdaFunction(this, sendRule, sendLambda, SEND_LAMBDA_PERMISSIONS);
// Resources to receive notifications and add devices to the registry
List<PolicyStatement> notificationCreateThingPolicyStatements = List.of(createThingPolicyStatement,
updateThingShadowPolicyStatement,
updateThingGroupsForThingPolicyStatement,
createThingGroupPolicyStatement);
Role notificationRole = buildRoleAssumedByLambda(this, NOTIFICATION_ROLE_NAME, notificationCreateThingPolicyStatements, List.of());
Function notificationLambda = buildIotEventLambda(NOTIFICATION_LAMBDA_FUNCTION_NAME, notificationRole, defaultEnvironment, NOTIFICATION_METHOD_NAME);
CfnTopicRule notificationRule = buildIotEventRule(this, NOTIFICATION_RULE_NAME, notificationLambda, DEFAULT_SELECT_CLAUSE, NOTIFICATION_TOPIC_FILTER);
allowIotTopicRuleToInvokeLambdaFunction(this, notificationRule, notificationLambda, NOTIFICATION_LAMBDA_PERMISSIONS);
}