in infrastructure/emr_launch/cluster_definition.py [0:0]
def __init__(self, scope: core.Construct, id: str, config: dict, **kwargs):
super().__init__(scope, id, **kwargs)
for k, v in config.items():
setattr(self, k, v)
vpc = ec2.Vpc(
self,
f"MyEMRVpc",
max_azs=3
)
vpc.add_gateway_endpoint(
"s3-gateway",
service=ec2.GatewayVpcEndpointAwsService.S3
)
vpc.add_gateway_endpoint(
"dynamo-gateway",
service=ec2.GatewayVpcEndpointAwsService.DYNAMODB
)
subnet = vpc.select_subnets(
subnet_type=ec2.SubnetType.PRIVATE
).subnets[0]
self._cluster_name = self.CLUSTER_NAME
log_bucket = s3.Bucket(
self,
"emr_logs",
bucket_name=f"emr-logs-{self.account}",
encryption=s3.BucketEncryption.S3_MANAGED,
)
artifact_bucket = s3.Bucket(
self,
"emr_artifacts",
bucket_name=f"emr-artifacts-{self.account}",
encryption=s3.BucketEncryption.S3_MANAGED,
)
self.synchronized_bucket = s3.Bucket(
self,
"emr_output",
bucket_name=f"synchronized-rosbag-topics-{self.account}",
encryption=s3.BucketEncryption.S3_MANAGED,
)
self.scenes_bucket = s3.Bucket(
self,
"emr_output_scenes",
bucket_name=f"detected-scenes-{self.account}",
encryption=s3.BucketEncryption.S3_MANAGED,
)
self._emr_profile = self.init_emr_profile(
vpc=vpc,
log_bucket=log_bucket,
artifact_bucket=artifact_bucket,
)
self.artifact_bucket = artifact_bucket
self.authorize_buckets(
input_data_bucket_arns=self.INPUT_BUCKETS,
output_data_bucket_arns=[
self.synchronized_bucket.bucket_arn,
self.scenes_bucket.bucket_arn,
],
)
bootstrap_code = s3d.BucketDeployment(
self,
id="bootstrap_actions",
destination_bucket=artifact_bucket,
destination_key_prefix="bootstrap_actions",
sources=[s3d.Source.asset("infrastructure/emr_launch/bootstrap_actions/")],
)
bootstrap_actions = []
for f in listdir("infrastructure/emr_launch/bootstrap_actions/"):
bootstrap_actions.append(
f"s3://{artifact_bucket.bucket_name}/bootstrap_actions/{f}"
)
self._cluster_configuration = self.emr_resource_config(
subnet=subnet,
master_instance_type=self.MASTER_INSTANCE_TYPE,
core_instance_type=self.CORE_INSTANCE_TYPE,
core_instance_count=self.CORE_INSTANCE_COUNT,
release_label=self.RELEASE_LABEL,
applications=self.APPLICATIONS,
configuration=self.CONFIGURATION,
core_instance_market=self.CORE_INSTANCE_MARKET,
task_instance_type=self.TASK_INSTANCE_TYPE,
task_instance_market=self.TASK_INSTANCE_MARKET,
task_instance_count=self.TASK_INSTANCE_COUNT,
bootstrap_action_script_paths=bootstrap_actions,
)
self._launch_function = self.launch_function_config(
emr_profile=self.emr_profile,
cluster_configuration=self.cluster_configuration,
default_fail_if_cluster_running=True,
)
self.outputs()