in lib/ramble/ramble/application.py [0:0]
def _define_commands(self, exec_graph, success_list=None):
"""Populate the internal list of commands based on executables
Populates self._command_list with a list of the executable commands that
should be executed by this experiment.
"""
if len(self._command_list) > 0:
return
self._command_list = []
self._command_list_without_logs = []
if success_list is None:
success_list = ramble.success_criteria.ScopedCriteriaList()
# Inject all prepended chained experiments
for chained_exp in self.chain_prepend:
self._command_list.append(self.chain_commands[chained_exp])
self._command_list_without_logs.append(self.chain_commands[chained_exp])
# ensure all log files are purged and set up
logs = []
for exec_node in exec_graph.walk():
if isinstance(exec_node.attribute, ramble.util.executable.CommandExecutable):
exec_cmd = exec_node.attribute
if exec_cmd.redirect:
expanded_log = self.expander.expand_var(exec_cmd.redirect)
logs.append(expanded_log)
analysis_logs, _, _ = self._analysis_dicts(success_list)
for log in analysis_logs:
logs.append(log)
logs = list(dict.fromkeys(logs))
for log in logs:
self._command_list.append('rm -f "%s"' % log)
self._command_list.append('touch "%s"' % log)
for exec_node in exec_graph.walk():
exec_vars = {"executable_name": exec_node.key}
if isinstance(exec_node.attribute, ramble.util.executable.CommandExecutable):
exec_vars.update(exec_node.attribute.variables)
for mod in self._modifier_instances:
if mod.applies_to_executable(exec_node.key):
exec_vars.update(mod.modded_variables(self, exec_vars))
if isinstance(exec_node.attribute, ramble.util.executable.CommandExecutable):
# Process directive defined executables
base_command = exec_node.attribute.copy()
pre_commands = []
post_commands = []
for mod in self._modifier_instances:
if mod.applies_to_executable(exec_node.key):
pre_cmd, post_cmd = mod.apply_executable_modifiers(
exec_node.key, base_command, app_inst=self
)
pre_commands.extend(pre_cmd)
post_commands.extend(post_cmd)
command_configs = pre_commands.copy()
command_configs.append(base_command)
command_configs.extend(post_commands)
for cmd_conf in command_configs:
mpi_cmd = ""
if cmd_conf.mpi:
raw_mpi_cmd = self.expander.expand_var("{mpi_command}", exec_vars).strip()
if (
not raw_mpi_cmd
and int(self.expander.expand_var_name(self.keywords.n_nodes)) > 1
):
logger.warn(
f"Command {cmd_conf.name} requires a non-empty `mpi_command` "
"variable in a multi-node experiment"
)
mpi_cmd = " " + raw_mpi_cmd + " "
redirect = ""
if cmd_conf.redirect:
out_log = self.expander.expand_var(cmd_conf.redirect, exec_vars)
output_operator = cmd_conf.output_capture
redirect_mapper = output_mapper()
redirect = redirect_mapper.generate_out_string(out_log, output_operator)
if cmd_conf.run_in_background:
bg_cmd = " &"
else:
bg_cmd = ""
for part in cmd_conf.template:
command_part = f"{mpi_cmd}{part}"
suffix_part = f"{redirect}{bg_cmd}"
expanded_cmd = self.expander.expand_var(command_part, exec_vars).lstrip()
suffix_cmd = self.expander.expand_var(suffix_part, exec_vars).lstrip()
self._command_list.append((expanded_cmd + " " + suffix_cmd).rstrip())
self._command_list_without_logs.append(expanded_cmd)
else: # All Builtins
func = exec_node.attribute
func_cmds = func()
for cmd in func_cmds:
expanded = self.expander.expand_var(cmd, exec_vars)
self._command_list.append(expanded)
self._command_list_without_logs.append(expanded)
# Inject all appended chained experiments
for chained_exp in self.chain_append:
expanded = self.expander.expand_var(self.chain_commands[chained_exp])
self._command_list.append(expanded)
self._command_list_without_logs.append(expanded)