def build_emr_cluster()

in infrastructure-provisioning/src/general/scripts/aws/dataengine-service_create.py [0:0]


def build_emr_cluster(args):
    try:
        # Parse applications
        apps = args.applications.split(" ")
        names = []
        for i in apps:
            names.append({"Name": i})
    
        # Parse Tags
        parser = re.split('[, ]+', args.tags)
        tags = list()
        for i in parser:
            key, value = i.split("=")
            tags.append({"Value": value, "Key": key})
        tags.append({'Key': os.environ['conf_tag_resource_id'],
                     'Value': '{}:{}'.format(args.service_base_name, args.name)})
        tags.append({'Key': os.environ['conf_billing_tag_key'],
                     'Value': os.environ['conf_billing_tag_value']})
        prefix = "jars/" + args.release_label + "/lib/"
        jars_exist = get_object_count(args.s3_bucket, prefix)
    
        # Parse steps
        if args.steps != '':
            global cp_config
            cp_config = cp_config + "; " + args.steps
        if args.cp_jars_2_s3 or jars_exist == 0:
            steps = parse_steps(cp_config + "; " + cp_jars)
        else:
            steps = parse_steps(cp_config)
    
        if args.dry_run:
            logging.info("Build parameters are:")
            logging.info(args)
            logging.info("\n")
            logging.info("Applications to be installed:")
            logging.info(names)
            logging.info("\n")
            logging.info("Cluster tags:")
            logging.info(tags)
            logging.info("\n")
            logging.info("Cluster Jobs:")
            logging.info(steps)
    
        if not args.dry_run:
            socket = boto3.client('emr')
            if args.slave_instance_spot == 'True':
                result = socket.run_job_flow(
                    Name=args.name,
                    ReleaseLabel=args.release_label,
                    Instances={'Ec2KeyName': args.ssh_key,
                               'KeepJobFlowAliveWhenNoSteps': not args.auto_terminate,
                               'Ec2SubnetId': get_subnet_by_cidr(args.subnet, os.environ['aws_notebook_vpc_id']),
                               'InstanceGroups': [
                                   {'Market': 'SPOT',
                                    'BidPrice': args.bid_price[:5],
                                    'InstanceRole': 'CORE',
                                    'InstanceType': args.slave_instance_type,
                                    'InstanceCount': int(
                                        args.instance_count) - 1},
                                   {'Market': 'ON_DEMAND',
                                    'InstanceRole': 'MASTER',
                                    'InstanceType': args.master_instance_type,
                                    'InstanceCount': 1}],
                               'AdditionalMasterSecurityGroups': [
                                   get_security_group_by_name(
                                       args.additional_emr_sg)
                               ],
                               'AdditionalSlaveSecurityGroups': [
                                   get_security_group_by_name(
                                       args.additional_emr_sg)
                               ]
                               },
                    Applications=names,
                    Tags=tags,
                    Steps=steps,
                    VisibleToAllUsers=not args.auto_terminate,
                    JobFlowRole=args.ec2_role,
                    ServiceRole=args.service_role,
                    Configurations=ast.literal_eval(args.configurations))
            else:
                result = socket.run_job_flow(
                    Name=args.name,
                    ReleaseLabel=args.release_label,
                    Instances={'MasterInstanceType': args.master_instance_type,
                               'SlaveInstanceType': args.slave_instance_type,
                               'InstanceCount': args.instance_count,
                               'Ec2KeyName': args.ssh_key,
                               # 'Placement': {'AvailabilityZone': args.availability_zone},
                               'KeepJobFlowAliveWhenNoSteps': not args.auto_terminate,
                               'Ec2SubnetId': get_subnet_by_cidr(args.subnet, os.environ['aws_notebook_vpc_id']),
                               'AdditionalMasterSecurityGroups': [
                                   get_security_group_by_name(
                                       args.additional_emr_sg)
                               ],
                               'AdditionalSlaveSecurityGroups': [
                                   get_security_group_by_name(
                                       args.additional_emr_sg)
                               ]
                               },
                    Applications=names,
                    Tags=tags,
                    Steps=steps,
                    VisibleToAllUsers=not args.auto_terminate,
                    JobFlowRole=args.ec2_role,
                    ServiceRole=args.service_role,
                    Configurations=ast.literal_eval(args.configurations))
            logging.info("Cluster_id {}".format(result.get('JobFlowId')))
            return result.get('JobFlowId')
    except Exception as err:
        logging.error("Failed to build EMR cluster: " +
                     str(err) + "\n Traceback: " +
                     traceback.print_exc(file=sys.stdout))