in awsbatch-cli/src/awsbatch/common.py [0:0]
def __init_from_config(self, cli_config_file, cluster, log): # noqa: C901 FIXME
"""
Init object attributes from awsbatch-cli configuration file.
:param cli_config_file: awsbatch-cli config
:param cluster: cluster name
:param log: log
"""
with open(cli_config_file, encoding="utf-8") as config_file:
log.info("Searching for configuration file %s" % cli_config_file)
config = ConfigParser()
config.read_file(config_file)
# use cluster if there or search for default value in [main] section of the config file
try:
cluster_name = cluster if cluster else config.get("main", "cluster_name")
except NoSectionError as e:
fail("Error getting the section [%s] from the configuration file (%s)" % (e.section, cli_config_file))
except NoOptionError as e:
fail(
"Error getting the option (%s) from the section [%s] of the configuration file (%s)"
% (e.option, e.section, cli_config_file)
)
cluster_section = "cluster {0}".format(cluster_name)
try:
self.region = config.get("main", "region")
except NoOptionError:
pass
try:
self.env_blacklist = config.get("main", "env_blacklist")
except NoOptionError:
pass
try:
self.stack_name = cluster_name
log.info("Stack name is (%s)" % self.stack_name)
# if region is set for the current stack, override the region from the AWS ParallelCluster config file
# or the region from the [main] section
self.region = config.get(cluster_section, "region")
self.s3_bucket = config.get(cluster_section, "s3_bucket")
self.artifact_directory = config.get(cluster_section, "artifact_directory")
self.batch_cli_requirements = config.get(cluster_section, "batch_cli_requirements")
self.compute_environment = config.get(cluster_section, "compute_environment")
self.job_queue = config.get(cluster_section, "job_queue")
self.job_definition = config.get(cluster_section, "job_definition")
try:
self.job_definition_mnp = config.get(cluster_section, "job_definition_mnp")
except NoOptionError:
pass
self.head_node_ip = config.get(cluster_section, "head_node_ip")
# get proxy
self.proxy = config.get(cluster_section, "proxy")
if self.proxy != "NONE":
log.info("Configured proxy is: %s" % self.proxy)
except NoSectionError:
# initialize by getting stack info
self.__init_from_stack(cluster_name, log)
except NoOptionError as e:
fail(
"Error getting the option (%s) from the section [%s] of the configuration file (%s)"
% (e.option, e.section, cli_config_file)
)