in cdk/src/main/java/com/amazonaws/services/kinesisanalytics/AmazonKinesisDataAnalyticsApacheFlinkServerSentEventsSseCdkStack.java [225:262]
private void createKinesisAnalyticsInit(final Application application) { //NOPMD - suppressed MethodArgumentCouldBeFinal - TODO explain reason for suppression
final String functionCode = readFile("lambda/KinesisAnalyticsSetup.js");
final ConcurrentHashMap<String, String> environmentProperties = new ConcurrentHashMap<>();
environmentProperties.put("ApplicationName", application.getApplicationName());
environmentProperties.put("OutputStream", outputDataStream.getStreamName());
final SingletonFunction lambdaFunction = SingletonFunction.Builder.create(this, "KinesisAnalyticsInit")
.description("Initialize the Amazon Kinesis Data Analytics application")
.code(Code.fromInline(functionCode))
.handler("index.handler")
.timeout(Duration.seconds(30))
.runtime(software.amazon.awscdk.services.lambda.Runtime.NODEJS_12_X)
.uuid(java.util.UUID.randomUUID().toString())
.environment(environmentProperties)
.build();
lambdaFunction.addToRolePolicy(PolicyStatement.Builder.create()
.resources(List.of(application.getApplicationArn()))
.actions(List.of(
"kinesisanalytics:UpdateApplication",
"kinesisanalytics:DescribeApplication",
"kinesisanalytics:AddApplicationVpcConfiguration"
))
.effect(Effect.ALLOW)
.build());
final ConcurrentHashMap<String, Object> resourceProperties = new ConcurrentHashMap<>();
final ConcurrentHashMap<String, Object> vpcConfig = new ConcurrentHashMap<>();
vpcConfig.put("SecurityGroupIds", this.createVPC ? List.of(securityGroup.getSecurityGroupId()) : securityGroupIdsParam.getValueAsList());
vpcConfig.put("SubnetIds", this.createVPC ? List.of(vpc.getPrivateSubnets().get(0).getSubnetId()) : subnetIdsParam.getValueAsList());
resourceProperties.put("VpcConfiguration", vpcConfig);
CustomResource.Builder.create(this, "KinesisAnalyticsInitResourceVPC")
.properties(resourceProperties)
.serviceToken(lambdaFunction.getFunctionArn())
.build();
}