in awscli/customizations/emr/createcluster.py [0:0]
def _run_main_command(self, parsed_args, parsed_globals):
params = {}
params['Name'] = parsed_args.name
self._validate_release_label_ami_version(parsed_args)
service_role_validation_message = (
" Either choose --use-default-roles or use both --service-role "
"<roleName> and --ec2-attributes InstanceProfile=<profileName>.")
if parsed_args.use_default_roles is True and \
parsed_args.service_role is not None:
raise exceptions.MutualExclusiveOptionError(
option1="--use-default-roles",
option2="--service-role",
message=service_role_validation_message)
if parsed_args.use_default_roles is True and \
parsed_args.ec2_attributes is not None and \
'InstanceProfile' in parsed_args.ec2_attributes:
raise exceptions.MutualExclusiveOptionError(
option1="--use-default-roles",
option2="--ec2-attributes InstanceProfile",
message=service_role_validation_message)
if parsed_args.instance_groups is not None and \
parsed_args.instance_fleets is not None:
raise exceptions.MutualExclusiveOptionError(
option1="--instance-groups",
option2="--instance-fleets")
instances_config = {}
if parsed_args.instance_fleets is not None:
instances_config['InstanceFleets'] = \
instancefleetsutils.validate_and_build_instance_fleets(
parsed_args.instance_fleets)
else:
instances_config['InstanceGroups'] = \
instancegroupsutils.validate_and_build_instance_groups(
instance_groups=parsed_args.instance_groups,
instance_type=parsed_args.instance_type,
instance_count=parsed_args.instance_count)
if parsed_args.release_label is not None:
params["ReleaseLabel"] = parsed_args.release_label
if parsed_args.configurations is not None:
try:
params["Configurations"] = json.loads(
parsed_args.configurations)
except ValueError:
raise ValueError('aws: error: invalid json argument for '
'option --configurations')
if (
parsed_args.release_label is None
and parsed_args.ami_version is not None
):
is_valid_ami_version = re.match(r'\d?\..*', parsed_args.ami_version)
if is_valid_ami_version is None:
raise exceptions.InvalidAmiVersionError(
ami_version=parsed_args.ami_version)
params['AmiVersion'] = parsed_args.ami_version
emrutils.apply_dict(
params, 'AdditionalInfo', parsed_args.additional_info)
emrutils.apply_dict(params, 'LogUri', parsed_args.log_uri)
if parsed_args.os_release_label is not None:
emrutils.apply_dict(params, 'OSReleaseLabel',
parsed_args.os_release_label)
if parsed_args.log_encryption_kms_key_id is not None:
emrutils.apply_dict(params, 'LogEncryptionKmsKeyId',
parsed_args.log_encryption_kms_key_id)
if parsed_args.use_default_roles is True:
parsed_args.service_role = EMR_ROLE_NAME
if parsed_args.ec2_attributes is None:
parsed_args.ec2_attributes = {}
parsed_args.ec2_attributes['InstanceProfile'] = EC2_ROLE_NAME
emrutils.apply_dict(params, 'ServiceRole', parsed_args.service_role)
if parsed_args.instance_groups is not None:
for instance_group in instances_config['InstanceGroups']:
if 'AutoScalingPolicy' in instance_group.keys():
if parsed_args.auto_scaling_role is None:
raise exceptions.MissingAutoScalingRoleError()
emrutils.apply_dict(params, 'AutoScalingRole', parsed_args.auto_scaling_role)
if parsed_args.scale_down_behavior is not None:
emrutils.apply_dict(params, 'ScaleDownBehavior', parsed_args.scale_down_behavior)
if (
parsed_args.no_auto_terminate is False and
parsed_args.auto_terminate is False):
parsed_args.no_auto_terminate = True
instances_config['KeepJobFlowAliveWhenNoSteps'] = \
emrutils.apply_boolean_options(
parsed_args.no_auto_terminate,
'--no-auto-terminate',
parsed_args.auto_terminate,
'--auto-terminate')
instances_config['TerminationProtected'] = \
emrutils.apply_boolean_options(
parsed_args.termination_protected,
'--termination-protected',
parsed_args.no_termination_protected,
'--no-termination-protected')
if (parsed_args.unhealthy_node_replacement or parsed_args.no_unhealthy_node_replacement):
instances_config['UnhealthyNodeReplacement'] = \
emrutils.apply_boolean_options(
parsed_args.unhealthy_node_replacement,
'--unhealthy-node-replacement',
parsed_args.no_unhealthy_node_replacement,
'--no-unhealthy-node-replacement')
if (parsed_args.visible_to_all_users is False and
parsed_args.no_visible_to_all_users is False):
parsed_args.visible_to_all_users = True
params['VisibleToAllUsers'] = \
emrutils.apply_boolean_options(
parsed_args.visible_to_all_users,
'--visible-to-all-users',
parsed_args.no_visible_to_all_users,
'--no-visible-to-all-users')
params['Tags'] = emrutils.parse_tags(parsed_args.tags)
params['Instances'] = instances_config
if parsed_args.ec2_attributes is not None:
self._build_ec2_attributes(
cluster=params, parsed_attrs=parsed_args.ec2_attributes)
debugging_enabled = emrutils.apply_boolean_options(
parsed_args.enable_debugging,
'--enable-debugging',
parsed_args.no_enable_debugging,
'--no-enable-debugging')
if parsed_args.log_uri is None and debugging_enabled is True:
raise exceptions.LogUriError
if debugging_enabled is True:
self._update_cluster_dict(
cluster=params,
key='Steps',
value=[
self._build_enable_debugging(parsed_args, parsed_globals)])
if parsed_args.applications is not None:
if parsed_args.release_label is None:
app_list, ba_list, step_list = \
applicationutils.build_applications(
region=self.region,
parsed_applications=parsed_args.applications,
ami_version=params['AmiVersion'])
self._update_cluster_dict(
params, 'NewSupportedProducts', app_list)
self._update_cluster_dict(
params, 'BootstrapActions', ba_list)
self._update_cluster_dict(
params, 'Steps', step_list)
else:
params["Applications"] = []
for application in parsed_args.applications:
params["Applications"].append(application)
hbase_restore_config = parsed_args.restore_from_hbase_backup
if hbase_restore_config is not None:
args = hbaseutils.build_hbase_restore_from_backup_args(
dir=hbase_restore_config.get('Dir'),
backup_version=hbase_restore_config.get('BackupVersion'))
step_config = emrutils.build_step(
jar=constants.HBASE_JAR_PATH,
name=constants.HBASE_RESTORE_STEP_NAME,
action_on_failure=constants.CANCEL_AND_WAIT,
args=args)
self._update_cluster_dict(
params, 'Steps', [step_config])
if parsed_args.bootstrap_actions is not None:
self._build_bootstrap_actions(
cluster=params,
parsed_boostrap_actions=parsed_args.bootstrap_actions)
if parsed_args.emrfs is not None:
self._handle_emrfs_parameters(
cluster=params,
emrfs_args=parsed_args.emrfs,
release_label=parsed_args.release_label)
if parsed_args.steps is not None:
steps_list = steputils.build_step_config_list(
parsed_step_list=parsed_args.steps,
region=self.region,
release_label=parsed_args.release_label)
self._update_cluster_dict(
cluster=params, key='Steps', value=steps_list)
if parsed_args.security_configuration is not None:
emrutils.apply_dict(
params, 'SecurityConfiguration', parsed_args.security_configuration)
if parsed_args.custom_ami_id is not None:
emrutils.apply_dict(
params, 'CustomAmiId', parsed_args.custom_ami_id
)
if parsed_args.ebs_root_volume_size is not None:
emrutils.apply_dict(
params, 'EbsRootVolumeSize', int(parsed_args.ebs_root_volume_size)
)
if parsed_args.ebs_root_volume_iops is not None:
emrutils.apply_dict(
params, 'EbsRootVolumeIops', int(parsed_args.ebs_root_volume_iops)
)
if parsed_args.ebs_root_volume_throughput is not None:
emrutils.apply_dict(
params, 'EbsRootVolumeThroughput', int(parsed_args.ebs_root_volume_throughput)
)
if parsed_args.repo_upgrade_on_boot is not None:
emrutils.apply_dict(
params, 'RepoUpgradeOnBoot', parsed_args.repo_upgrade_on_boot
)
if parsed_args.kerberos_attributes is not None:
emrutils.apply_dict(
params, 'KerberosAttributes', parsed_args.kerberos_attributes)
if parsed_args.step_concurrency_level is not None:
params['StepConcurrencyLevel'] = parsed_args.step_concurrency_level
if parsed_args.managed_scaling_policy is not None:
emrutils.apply_dict(
params, 'ManagedScalingPolicy', parsed_args.managed_scaling_policy)
if parsed_args.placement_group_configs is not None:
emrutils.apply_dict(
params, 'PlacementGroupConfigs',
parsed_args.placement_group_configs)
if parsed_args.auto_termination_policy is not None:
emrutils.apply_dict(
params, 'AutoTerminationPolicy',
parsed_args.auto_termination_policy)
self._validate_required_applications(parsed_args)
run_job_flow_response = emrutils.call(
self._session, 'run_job_flow', params, self.region,
parsed_globals.endpoint_url, parsed_globals.verify_ssl)
constructed_result = self._construct_result(run_job_flow_response)
emrutils.display_response(self._session, 'run_job_flow',
constructed_result, parsed_globals)
return 0