in refarch/aws-native/common/common_cdk/stream_data_generator.py [0:0]
def __init__(self, scope: core.Construct, id: str,
log_bucket: _s3.Bucket,
config_table: _dynamodb.Table,
tshirt_size: str,
sink_bucket: _s3.Bucket,
web_sale_stream: str,
web_customer_stream: str,
web_customer_address_stream: str,
kinesis_key: _kms.Key,
vpc: _ec2.Vpc,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
stack = core.Stack.of(self)
stream_source_bucket = AutoEmptyBucket(
self, 'StreamSource',
bucket_name='ara-stream-source-'+core.Aws.ACCOUNT_ID,
uuid='95505f50-0276-11eb-adc1-0242ac120002'
)
service_role = _iam.Role(
self, 'StreamEmrServiceRole',
assumed_by=_iam.ServicePrincipal('elasticmapreduce.amazonaws.com')
)
service_role.add_managed_policy(_iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AmazonElasticMapReduceRole'))
cluster_role = _iam.Role(
self, 'StreamEmrClusterRole',
assumed_by=_iam.ServicePrincipal("ec2.amazonaws.com")
)
_iam.Policy(
self, 'StreamEmrClusterPolicy',
statements=[
_iam.PolicyStatement(
actions=[
"glue:CreateDatabase",
"glue:UpdateDatabase",
"glue:DeleteDatabase",
"glue:GetDatabase",
"glue:GetDatabases",
"glue:CreateTable",
"glue:UpdateTable",
"glue:DeleteTable",
"glue:GetTable",
"glue:GetTables",
"glue:GetTableVersions",
"glue:CreatePartition",
"glue:BatchCreatePartition",
"glue:UpdatePartition",
"glue:DeletePartition",
"glue:BatchDeletePartition",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition",
"glue:CreateUserDefinedFunction",
"glue:UpdateUserDefinedFunction",
"glue:DeleteUserDefinedFunction",
"glue:GetUserDefinedFunction",
"glue:GetUserDefinedFunctions",
"cloudwatch:PutMetricData",
"dynamodb:ListTables",
"s3:HeadBucket",
"ec2:Describe*",
],
resources=['*']
),
_iam.PolicyStatement(
actions=['s3:GetObject'],
resources=[
'arn:aws:s3:::' + ARA_BUCKET_NAME + BINARIES + DataGenConfig.DSDGEN_INSTALL_SCRIPT,
'arn:aws:s3:::' + ARA_BUCKET_NAME + BINARIES + DataGenConfig.JAR_FILE
]
),
_iam.PolicyStatement(
actions=['s3:PutObject'],
resources=[log_bucket.bucket_arn + "/data-generator/*"]
),
_iam.PolicyStatement(
actions=[
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:DeleteObject",
"s3:GetBucketVersioning",
"s3:GetObject",
"s3:GetObjectTagging",
"s3:GetObjectVersion",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListBucketVersions",
"s3:ListMultipartUploadParts",
"s3:PutBucketVersioning",
"s3:PutObject",
"s3:PutObjectTagging"
],
resources=[
sink_bucket.bucket_arn + '/*',
sink_bucket.bucket_arn,
stream_source_bucket.bucket.bucket_arn + '/*',
stream_source_bucket.bucket.bucket_arn
]
)
],
roles=[cluster_role]
)
cluster_role.add_managed_policy(_iam.ManagedPolicy.from_aws_managed_policy_name('AmazonSSMManagedInstanceCore'))
_iam.CfnInstanceProfile(
self, 'StreamEmrClusterInstanceProfile',
roles=[cluster_role.role_name],
instance_profile_name=cluster_role.role_name
)
# Security Groups for the EMR cluster (private subnet)
# https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-man-sec-groups.html#emr-sg-elasticmapreduce-master-private
master_sg = _ec2.SecurityGroup(self, 'ElasticMapReduce-Master-Private', vpc=vpc)
slave_sg = _ec2.SecurityGroup(self, 'ElasticMapReduce-Slave-Private', vpc=vpc)
service_sg = _ec2.SecurityGroup(self, 'ElasticMapReduce-ServiceAccess', vpc=vpc, allow_all_outbound=False)
# Service SG used by the proxy instance
service_sg.add_ingress_rule(master_sg, _ec2.Port.tcp(9443))
service_sg.add_egress_rule(master_sg, _ec2.Port.tcp(8443))
service_sg.add_egress_rule(slave_sg, _ec2.Port.tcp(8443))
# EMR Master
master_sg.add_ingress_rule(master_sg, _ec2.Port.all_icmp())
master_sg.add_ingress_rule(master_sg, _ec2.Port.all_tcp())
master_sg.add_ingress_rule(master_sg, _ec2.Port.all_udp())
master_sg.add_ingress_rule(slave_sg, _ec2.Port.all_icmp())
master_sg.add_ingress_rule(slave_sg, _ec2.Port.all_tcp())
master_sg.add_ingress_rule(slave_sg, _ec2.Port.all_udp())
master_sg.add_ingress_rule(service_sg, _ec2.Port.tcp(8443))
# EMR Slave
slave_sg.add_ingress_rule(master_sg, _ec2.Port.all_icmp())
slave_sg.add_ingress_rule(master_sg, _ec2.Port.all_tcp())
slave_sg.add_ingress_rule(master_sg, _ec2.Port.all_udp())
slave_sg.add_ingress_rule(slave_sg, _ec2.Port.all_icmp())
slave_sg.add_ingress_rule(slave_sg, _ec2.Port.all_tcp())
slave_sg.add_ingress_rule(slave_sg, _ec2.Port.all_udp())
slave_sg.add_ingress_rule(service_sg, _ec2.Port.tcp(8443))
with open('common/common_cdk/lambda/datagen_config.py', 'r') as f:
lambda_source = f.read()
configure_datagen_function = _lambda.SingletonFunction(
self, 'StreamConfigureDatagenLambda',
uuid="a9904dec-01cf-11eb-adc1-0242ac120002",
runtime=_lambda.Runtime.PYTHON_3_7,
code=_lambda.Code.inline(lambda_source),
handler='index.handler',
function_name='stream-datagen-config',
environment={
'TABLE_NAME': config_table.table_name,
'JAR_LOCATION': BINARIES_LOCATION + DataGenConfig.JAR_FILE,
},
timeout=core.Duration.seconds(10)
)
configure_datagen_function.role.add_to_policy(
_iam.PolicyStatement(
actions=[
'dynamodb:GetItem',
'dynamodb:PutItem',
],
resources=[config_table.table_arn]
)
)
emr_cluster = _emr.CfnCluster(
self, 'StreamEmrCluster',
name="StreamDatagenCluster",
job_flow_role=cluster_role.role_name,
service_role=service_role.role_name,
release_label='emr-5.30.1',
visible_to_all_users=True,
log_uri=log_bucket.s3_url_for_object() + "/data-generator",
applications=[
_emr.CfnCluster.ApplicationProperty(name='hadoop'),
_emr.CfnCluster.ApplicationProperty(name='spark')
],
bootstrap_actions=[
_emr.CfnCluster.BootstrapActionConfigProperty(
name="dsdgen-install",
script_bootstrap_action=_emr.CfnCluster.ScriptBootstrapActionConfigProperty(
path=BINARIES_LOCATION + DataGenConfig.DSDGEN_INSTALL_SCRIPT
)
)
],
instances=_emr.CfnCluster.JobFlowInstancesConfigProperty(
emr_managed_master_security_group=master_sg.security_group_id,
emr_managed_slave_security_group=slave_sg.security_group_id,
service_access_security_group=service_sg.security_group_id,
ec2_subnet_id=vpc.private_subnets[0].subnet_id,
core_instance_group=_emr.CfnCluster.InstanceGroupConfigProperty(
instance_count=DataGenConfig.BATCH_CLUSTER_SIZE[tshirt_size],
instance_type='m5.xlarge'
),
master_instance_group=_emr.CfnCluster.InstanceGroupConfigProperty(
instance_count=1,
instance_type='m4.large'
)
)
)
configure_datagen = _sfn_tasks.LambdaInvoke(
self, "ConfigureDatagenTask",
lambda_function=configure_datagen_function,
payload=_sfn.TaskInput.from_text('{'
'"Param": "stream_iterator",'
'"Module": "stream",'
'"SinkBucket": "'+sink_bucket.s3_url_for_object()+'",'
'"Parallelism": "'+str(int(DataGenConfig.STREAM_DATA_SIZE[tshirt_size])*2)+'",'
'"DataSize": "'+DataGenConfig.STREAM_DATA_SIZE[tshirt_size]+'",'
'"TmpBucket": "'+str(stream_source_bucket.bucket.s3_url_for_object())+'"'
'}'),
result_path='$.Config'
)
add_datagen_step = _sfn.CustomState(
self, 'StreamAddDataGenStep',
state_json={
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.Emr.Cluster.Id",
"Step": {
"Name": "DatagenStep",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args.$": "$.Config.Payload.StepParam"
}
}
},
"ResultPath": "$.Step",
"Next": "StreamUpdateIterator"
}
)
update_iterator = _sfn_tasks.DynamoUpdateItem(
self, 'StreamUpdateIterator',
table=config_table,
key={
'param': _sfn_tasks.DynamoAttributeValue.from_string('stream_iterator')
},
update_expression='SET iterator = if_not_exists(iterator, :start) + :inc',
expression_attribute_values={
":inc": _sfn_tasks.DynamoAttributeValue.from_number(1),
":start": _sfn_tasks.DynamoAttributeValue.from_number(0)
},
result_path=_sfn.JsonPath.DISCARD
)
definition = configure_datagen \
.next(add_datagen_step) \
.next(update_iterator)
datagen_stepfunctions = _sfn.StateMachine(
self, "StreamDataGenStepFunctions",
definition=definition,
timeout=core.Duration.minutes(30)
)
datagen_stepfunctions.add_to_role_policy(
_iam.PolicyStatement(
actions=[
'elasticmapreduce:AddJobFlowSteps',
'elasticmapreduce:DescribeStep'
],
resources=['*']
)
)
step_trigger = _events.Rule(
self, 'StreamStepTrigger',
schedule=_events.Schedule.cron(minute='0/10',
hour='*',
month='*',
week_day='*',
year='*')
)
step_trigger.add_target(
_events_targets.SfnStateMachine(
machine=datagen_stepfunctions,
input=_events.RuleTargetInput.from_object({"Emr": {"Cluster": {"Id": core.Fn.ref(emr_cluster.logical_id)}}})
)
)
with open('common/common_cdk/lambda/stepfunctions_trigger.py', 'r') as f:
lambda_source = f.read()
stepfunctions_trigger_lambda = _lambda.SingletonFunction(
self, 'StreamStepFunctionsTriggerLambda',
uuid="cf042246-01d0-11eb-adc1-0242ac120002",
runtime=_lambda.Runtime.PYTHON_3_7,
code=_lambda.Code.inline(lambda_source),
handler='index.handler',
function_name='stepfunctions-stream-datagen-trigger'
)
stepfunctions_trigger_lambda.role.add_to_policy(
_iam.PolicyStatement(
actions=["states:StartExecution"],
resources=['*']
)
)
trigger_step_lambda_provider = _custom_resources.Provider(
self, 'StreamStepFunctionsTriggerLambdaProvider',
on_event_handler=stepfunctions_trigger_lambda
)
core.CustomResource(
self, 'StreamStepFunctionsTrigger',
service_token=trigger_step_lambda_provider.service_token,
properties={
"stepArn": datagen_stepfunctions.state_machine_arn
}
)
with open('common/common_cdk/lambda/stream_generator.py', 'r') as f:
lambda_source = f.read()
sale_stream_generator_lambda = _lambda.Function(
scope=self,
id='WebSaleStreamGenerator',
runtime=_lambda.Runtime.PYTHON_3_7,
memory_size=2048,
timeout=core.Duration.minutes(15),
code=_lambda.Code.inline(lambda_source),
handler='index.lambda_handler',
environment={
'REGION': core.Aws.REGION,
'STREAM_NAME': web_sale_stream
}
)
stream_source_bucket.bucket.add_event_notification(
_s3.EventType.OBJECT_CREATED,
_s3_notifications.LambdaDestination(sale_stream_generator_lambda),
_s3.NotificationKeyFilter(prefix='sale', suffix='csv')
)
sale_stream_generator_lambda.add_to_role_policy(
_iam.PolicyStatement(
actions=[
"s3:DeleteObject",
"s3:GetObject",
"s3:ListBucket",
],
resources=[
stream_source_bucket.bucket.bucket_arn+'/*',
stream_source_bucket.bucket.bucket_arn
]
)
)
sale_stream_generator_lambda.add_to_role_policy(
_iam.PolicyStatement(
actions=[
"kinesis:PutRecords"
],
resources=[stack.format_arn(service='kinesis', resource='stream', resource_name=web_sale_stream)]
)
)
sale_stream_generator_lambda.add_to_role_policy(
_iam.PolicyStatement(
actions=['kms:GenerateDataKey'],
resources=[stack.format_arn(service='kms', resource='key', sep='/', resource_name=kinesis_key.key_id)]
)
)
customer_stream_generator_lambda = _lambda.Function(
scope=self,
id='WebCustomerStreamGenerator',
runtime=_lambda.Runtime.PYTHON_3_7,
memory_size=2048,
timeout=core.Duration.minutes(15),
code=_lambda.Code.inline(lambda_source),
handler='index.lambda_handler',
environment={
'REGION': core.Aws.REGION,
'STREAM_NAME': web_customer_stream
}
)
stream_source_bucket.bucket.add_event_notification(
_s3.EventType.OBJECT_CREATED,
_s3_notifications.LambdaDestination(customer_stream_generator_lambda),
_s3.NotificationKeyFilter(prefix='customer', suffix='csv')
)
customer_stream_generator_lambda.add_to_role_policy(
_iam.PolicyStatement(
actions=[
"s3:DeleteObject",
"s3:GetObject",
"s3:ListBucket",
],
resources=[
stream_source_bucket.bucket.bucket_arn+'/*',
stream_source_bucket.bucket.bucket_arn
]
)
)
customer_stream_generator_lambda.add_to_role_policy(
_iam.PolicyStatement(
actions=[
"kinesis:PutRecords"
],
resources=[stack.format_arn(service='kinesis', resource='stream', resource_name=web_customer_stream)]
)
)
customer_stream_generator_lambda.add_to_role_policy(
_iam.PolicyStatement(
actions=['kms:GenerateDataKey'],
resources=[stack.format_arn(service='kms', resource='key', sep='/', resource_name=kinesis_key.key_id)]
)
)
address_stream_generator_lambda = _lambda.Function(
scope=self,
id='WebCustomerAddressStreamGenerator',
runtime=_lambda.Runtime.PYTHON_3_7,
memory_size=2048,
timeout=core.Duration.minutes(15),
code=_lambda.Code.inline(lambda_source),
handler='index.lambda_handler',
environment={
'REGION': core.Aws.REGION,
'STREAM_NAME': web_customer_address_stream
}
)
stream_source_bucket.bucket.add_event_notification(
_s3.EventType.OBJECT_CREATED,
_s3_notifications.LambdaDestination(address_stream_generator_lambda),
_s3.NotificationKeyFilter(prefix='address', suffix='csv')
)
address_stream_generator_lambda.add_to_role_policy(
_iam.PolicyStatement(
actions=[
"s3:DeleteObject",
"s3:GetObject",
"s3:ListBucket",
],
resources=[
stream_source_bucket.bucket.bucket_arn+'/*',
stream_source_bucket.bucket.bucket_arn
]
)
)
address_stream_generator_lambda.add_to_role_policy(
_iam.PolicyStatement(
actions=[
"kinesis:PutRecords"
],
resources=[stack.format_arn(service='kinesis', resource='stream', resource_name=web_customer_address_stream)]
)
)
address_stream_generator_lambda.add_to_role_policy(
_iam.PolicyStatement(
actions=['kms:GenerateDataKey'],
resources=[stack.format_arn(service='kms', resource='key', sep='/', resource_name=kinesis_key.key_id)]
)
)