in infrastructure-provisioning/src/general/scripts/aws/dataengine-service_create.py [0:0]
def build_emr_cluster(args):
try:
# Parse applications
apps = args.applications.split(" ")
names = []
for i in apps:
names.append({"Name": i})
# Parse Tags
parser = re.split('[, ]+', args.tags)
tags = list()
for i in parser:
key, value = i.split("=")
tags.append({"Value": value, "Key": key})
tags.append({'Key': os.environ['conf_tag_resource_id'],
'Value': '{}:{}'.format(args.service_base_name, args.name)})
tags.append({'Key': os.environ['conf_billing_tag_key'],
'Value': os.environ['conf_billing_tag_value']})
prefix = "jars/" + args.release_label + "/lib/"
jars_exist = get_object_count(args.s3_bucket, prefix)
# Parse steps
if args.steps != '':
global cp_config
cp_config = cp_config + "; " + args.steps
if args.cp_jars_2_s3 or jars_exist == 0:
steps = parse_steps(cp_config + "; " + cp_jars)
else:
steps = parse_steps(cp_config)
if args.dry_run:
logging.info("Build parameters are:")
logging.info(args)
logging.info("\n")
logging.info("Applications to be installed:")
logging.info(names)
logging.info("\n")
logging.info("Cluster tags:")
logging.info(tags)
logging.info("\n")
logging.info("Cluster Jobs:")
logging.info(steps)
if not args.dry_run:
socket = boto3.client('emr')
if args.slave_instance_spot == 'True':
result = socket.run_job_flow(
Name=args.name,
ReleaseLabel=args.release_label,
Instances={'Ec2KeyName': args.ssh_key,
'KeepJobFlowAliveWhenNoSteps': not args.auto_terminate,
'Ec2SubnetId': get_subnet_by_cidr(args.subnet, os.environ['aws_notebook_vpc_id']),
'InstanceGroups': [
{'Market': 'SPOT',
'BidPrice': args.bid_price[:5],
'InstanceRole': 'CORE',
'InstanceType': args.slave_instance_type,
'InstanceCount': int(
args.instance_count) - 1},
{'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': args.master_instance_type,
'InstanceCount': 1}],
'AdditionalMasterSecurityGroups': [
get_security_group_by_name(
args.additional_emr_sg)
],
'AdditionalSlaveSecurityGroups': [
get_security_group_by_name(
args.additional_emr_sg)
]
},
Applications=names,
Tags=tags,
Steps=steps,
VisibleToAllUsers=not args.auto_terminate,
JobFlowRole=args.ec2_role,
ServiceRole=args.service_role,
Configurations=ast.literal_eval(args.configurations))
else:
result = socket.run_job_flow(
Name=args.name,
ReleaseLabel=args.release_label,
Instances={'MasterInstanceType': args.master_instance_type,
'SlaveInstanceType': args.slave_instance_type,
'InstanceCount': args.instance_count,
'Ec2KeyName': args.ssh_key,
# 'Placement': {'AvailabilityZone': args.availability_zone},
'KeepJobFlowAliveWhenNoSteps': not args.auto_terminate,
'Ec2SubnetId': get_subnet_by_cidr(args.subnet, os.environ['aws_notebook_vpc_id']),
'AdditionalMasterSecurityGroups': [
get_security_group_by_name(
args.additional_emr_sg)
],
'AdditionalSlaveSecurityGroups': [
get_security_group_by_name(
args.additional_emr_sg)
]
},
Applications=names,
Tags=tags,
Steps=steps,
VisibleToAllUsers=not args.auto_terminate,
JobFlowRole=args.ec2_role,
ServiceRole=args.service_role,
Configurations=ast.literal_eval(args.configurations))
logging.info("Cluster_id {}".format(result.get('JobFlowId')))
return result.get('JobFlowId')
except Exception as err:
logging.error("Failed to build EMR cluster: " +
str(err) + "\n Traceback: " +
traceback.print_exc(file=sys.stdout))