in reinvent-2020/RhythmCloud/analytics/rhythm-analyzer/rhythym-deploy/src/main/java/com/amazonaws/rhythmcloud/deployment/TemporalAnalyzerConstruct.java [26:262]
public TemporalAnalyzerConstruct(
final Construct scope, final String id, final TemporalAnalyzerProps props) {
super(scope, id);
String content =
String.format(
"%s/%s", props.getRhythmCloudArtifactsBucket().getBucketArn(), props.getContentPath());
PolicyStatement s3ReadArtifactPolicy =
PolicyStatement.Builder.create()
.sid("ReadCode")
.effect(Effect.ALLOW)
.actions(Arrays.asList("s3:GetObject", "s3:GetObjectVersion"))
.resources(Collections.singletonList(content))
.build();
PolicyStatement kinesisMetadataReaderPolicyStatement =
PolicyStatement.Builder.create()
.sid("KinesisMetadataReader")
.effect(Effect.ALLOW)
.actions(Arrays.asList("kinesis:ListStreams", "kinesis:ListShards"))
.resources(Collections.singletonList("*"))
.build();
PolicyStatement kinesisWriterPolicyStatement =
PolicyStatement.Builder.create()
.sid("KinesisWriter")
.effect(Effect.ALLOW)
.actions(
Arrays.asList("kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords"))
.resources(
Collections.singletonList(props.getRhythmCloudAnalysisOutputStream().getAttrArn()))
.build();
PolicyStatement kinesisReaderPolicyStatement =
PolicyStatement.Builder.create()
.sid("KinesisReader")
.effect(Effect.ALLOW)
.actions(
Arrays.asList(
"kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords"))
.resources(
Arrays.asList(
props.getRhythmCloudSystemHitStream().getAttrArn(),
props.getRhythmCloudUserHitStream().getAttrArn()))
.build();
PolicyStatement cloudWatchAnalyzerPolicyStatement =
PolicyStatement.Builder.create()
.sid("CloudWatchAnalyzer")
.effect(Effect.ALLOW)
.actions(
Arrays.asList(
"logs:DescribeLogGroups", "logs:DescribeLogStreams", "logs:PutLogEvents"))
.resources(
Collections.singletonList(
String.format("arn:aws:logs:%s:%s:log-group:*", Aws.REGION, Aws.ACCOUNT_ID)))
.build();
// CDK does not support timestream db yet
// TODO: Once you add the right props for TimeStream DB,
// please change the ARN to the ARN of the timestream table
PolicyStatement timeStreamDBPolicyStatement =
PolicyStatement.Builder.create()
.sid("TimeStreamDBAll")
.effect(Effect.ALLOW)
.actions(
Arrays.asList(
"timestream:WriteRecords", "timestream:CancelQuery", "timestream:UpdateTable"))
.resources(Collections.singletonList("*"))
.build();
rhythmAnalyzerRole =
Role.Builder.create(this, "rhythm-cloud-analyzer-role")
.roleName("rhythm-cloud-analyzer-role")
.assumedBy(new ServicePrincipal("kinesisanalytics.amazonaws.com"))
.build();
rhythmAnalyzerPolicy =
ManagedPolicy.Builder.create(this, "rhythm-cloud-analyzer-policy")
.managedPolicyName("rhythm-cloud-analyzer-policy")
.description("Managed policy for rhythym analyzer")
.statements(
Arrays.asList(
s3ReadArtifactPolicy,
kinesisWriterPolicyStatement,
kinesisReaderPolicyStatement,
kinesisMetadataReaderPolicyStatement,
cloudWatchAnalyzerPolicyStatement,
timeStreamDBPolicyStatement))
.build();
rhythmAnalyzerRole.addManagedPolicy(rhythmAnalyzerPolicy);
rhythmAnalyzerLogGroup =
LogGroup.Builder.create(this, "rhythm-analyzer-log-group")
.logGroupName("/aws/kinesis-analytics/rhythm-analyzer")
.retention(RetentionDays.ONE_DAY)
.build();
rhythmAnalyzerLogStream =
rhythmAnalyzerLogGroup.addStream(
"kinesis-analytics-log-stream",
StreamOptions.builder().logStreamName("kinesis-analytics-log-stream").build());
CfnOutput.Builder.create(this, "rhythm-analyzer-log-group-output")
.exportName("rhythm-analyzer-log-group-output")
.description("Rhythm Analyzer Log Group")
.value(rhythmAnalyzerLogGroup.getLogGroupArn())
.build();
props.getRhythmCloudArtifactsBucket().grantRead(rhythmAnalyzerRole);
rhythmAnalyzerLogGroup.grantWrite(rhythmAnalyzerRole);
Map<String, String> systemHitPropertyMap = new HashMap<>();
systemHitPropertyMap.put("aws.region", "us-east-1");
systemHitPropertyMap.put("flink.stream.initpos", "LATEST");
systemHitPropertyMap.put("flink.shard.getrecords.intervalmillis", "1000");
systemHitPropertyMap.put("input.stream.name", props.getRhythmCloudSystemHitStream().getName());
systemHitPropertyMap.put("AggregationEnabled", "false");
CfnApplicationV2.PropertyGroupProperty systemHitPropertyGroup =
CfnApplicationV2.PropertyGroupProperty.builder()
.propertyGroupId("SYSTEMHIT")
.propertyMap(systemHitPropertyMap)
.build();
Map<String, String> userHitPropertyMap = new HashMap<>();
userHitPropertyMap.put("aws.region", "us-east-1");
userHitPropertyMap.put("flink.stream.initpos", "LATEST");
userHitPropertyMap.put("flink.shard.getrecords.intervalmillis", "1000");
userHitPropertyMap.put("input.stream.name", props.getRhythmCloudUserHitStream().getName());
userHitPropertyMap.put("AggregationEnabled", "false");
CfnApplicationV2.PropertyGroupProperty userHitPropertyGroup =
CfnApplicationV2.PropertyGroupProperty.builder()
.propertyGroupId("USERHIT")
.propertyMap(userHitPropertyMap)
.build();
Map<String, String> temporalAnalyzerPropertyMap = new HashMap<>();
temporalAnalyzerPropertyMap.put("aws.region", "us-east-1");
temporalAnalyzerPropertyMap.put(
"output.stream.name", props.getRhythmCloudAnalysisOutputStream().getName());
temporalAnalyzerPropertyMap.put("AggregationEnabled", "true");
CfnApplicationV2.PropertyGroupProperty temporalAnalyzerPropertyGroup =
CfnApplicationV2.PropertyGroupProperty.builder()
.propertyGroupId("TEMPORALANALYSIS")
.propertyMap(temporalAnalyzerPropertyMap)
.build();
// CDK for timestream is not available yet
// Manually create the resources and hard code the values here
// TODO: Replace when CDK is available for timestream
Map<String, String> timeStreamSinkPropertyMap = new HashMap<>();
timeStreamSinkPropertyMap.put("aws.region", "us-east-1");
timeStreamSinkPropertyMap.put("timestream.db.name", "rhythm_cloud");
timeStreamSinkPropertyMap.put("timestream.db.table.name", "rhythm");
timeStreamSinkPropertyMap.put("timestream.db.batch_size", "10");
CfnApplicationV2.PropertyGroupProperty timeStreamSinkPropertyGroup =
CfnApplicationV2.PropertyGroupProperty.builder()
.propertyGroupId("TIMESTREAM")
.propertyMap(timeStreamSinkPropertyMap)
.build();
CfnApplicationV2.ApplicationConfigurationProperty applicationConfigurationProperty =
CfnApplicationV2.ApplicationConfigurationProperty.builder()
.applicationSnapshotConfiguration(
CfnApplicationV2.ApplicationSnapshotConfigurationProperty.builder()
.snapshotsEnabled(true)
.build())
.applicationCodeConfiguration(
CfnApplicationV2.ApplicationCodeConfigurationProperty.builder()
.codeContent(
CfnApplicationV2.CodeContentProperty.builder()
.s3ContentLocation(
CfnApplicationV2.S3ContentLocationProperty.builder()
.bucketArn(props.getRhythmCloudArtifactsBucket().getBucketArn())
.fileKey(props.getContentPath())
.build())
.build())
.codeContentType("ZIPFILE")
.build())
.flinkApplicationConfiguration(
CfnApplicationV2.FlinkApplicationConfigurationProperty.builder()
.parallelismConfiguration(
CfnApplicationV2.ParallelismConfigurationProperty.builder()
.autoScalingEnabled(true)
.parallelism(2)
.parallelismPerKpu(1)
.configurationType("CUSTOM")
.build())
.monitoringConfiguration(
CfnApplicationV2.MonitoringConfigurationProperty.builder()
.logLevel("INFO")
.metricsLevel("TASK")
.configurationType("CUSTOM")
.build())
.build())
.environmentProperties(
CfnApplicationV2.EnvironmentPropertiesProperty.builder()
.propertyGroups(
Arrays.asList(
systemHitPropertyGroup,
userHitPropertyGroup,
timeStreamSinkPropertyGroup))
.build())
.build();
rhythmAnalyzerApplication =
CfnApplicationV2.Builder.create(this, "rhythm-analyzer")
.applicationName("rhythm-analyzer")
.applicationDescription("Analyzes the rhythm")
.serviceExecutionRole(rhythmAnalyzerRole.getRoleArn())
.runtimeEnvironment("FLINK-1_8")
.applicationConfiguration(applicationConfigurationProperty)
.build();
String applicationName = rhythmAnalyzerApplication.getRef();
CfnApplicationCloudWatchLoggingOptionV2.Builder.create(this, "cloud-watch-logging")
.applicationName(applicationName)
.cloudWatchLoggingOption(
CfnApplicationCloudWatchLoggingOptionV2.CloudWatchLoggingOptionProperty.builder()
.logStreamArn(
String.format(
"arn:aws:logs:%s:%s:log-group:%s:log-stream:%s",
Aws.REGION,
Aws.ACCOUNT_ID,
rhythmAnalyzerLogGroup.getLogGroupName(),
rhythmAnalyzerLogStream.getLogStreamName()))
.build())
.build();
CfnOutput.Builder.create(this, "rhythm-analyzer-application-output")
.exportName("rhythm-analyzer-application-output")
.description("Rhythm Analyzer Application")
.value(applicationName)
.build();
}