in ambari-agent/src/main/python/ambari_agent/ActionQueue.py [0:0]
def execute_command(self, command):
"""
Executes commands of type EXECUTION_COMMAND
"""
cluster_id = command["clusterId"]
command_id = command["commandId"]
command_type = command["commandType"]
num_attempts = 0
retry_duration = 0 # even with 0 allow one attempt
retry_able = False
delay = 1
log_command_output = True
command_canceled = False
command_result = {}
message = (
"Executing command with id = {commandId}, taskId = {taskId} for role = {role} of "
"cluster_id {cluster}.".format(
commandId=str(command_id),
taskId=str(command["taskId"]),
role=command["role"],
cluster=cluster_id,
)
)
logger.info(message)
taskId = command["taskId"]
# Preparing 'IN_PROGRESS' report
in_progress_status = self.commandStatuses.generate_report_template(command)
# The path of the files that contain the output log and error log use a prefix that the agent advertises to the
# server. The prefix is defined in agent-config.ini
if command_type != AgentCommand.auto_execution:
in_progress_status.update(
{
"tmpout": self.tmpdir + os.sep + "output-" + str(taskId) + ".txt",
"tmperr": self.tmpdir + os.sep + "errors-" + str(taskId) + ".txt",
"structuredOut": self.tmpdir
+ os.sep
+ "structured-out-"
+ str(taskId)
+ ".json",
"status": CommandStatus.in_progress,
}
)
else:
in_progress_status.update(
{
"tmpout": self.tmpdir + os.sep + "auto_output-" + str(taskId) + ".txt",
"tmperr": self.tmpdir + os.sep + "auto_errors-" + str(taskId) + ".txt",
"structuredOut": self.tmpdir
+ os.sep
+ "auto_structured-out-"
+ str(taskId)
+ ".json",
"status": CommandStatus.in_progress,
}
)
self.commandStatuses.put_command_status(command, in_progress_status)
if "commandParams" in command:
if "max_duration_for_retries" in command["commandParams"]:
retry_duration = int(command["commandParams"]["max_duration_for_retries"])
if (
"command_retry_enabled" in command["commandParams"]
and command_type != AgentCommand.auto_execution
):
# for AgentCommand.auto_execution command retry_able should be always false
retry_able = command["commandParams"]["command_retry_enabled"] == "true"
if "log_output" in command["commandParams"]:
log_command_output = command["commandParams"]["log_output"] != "false"
logger.info(
"Command execution metadata - taskId = {taskId}, retry enabled = {retryAble}, max retry duration (sec)"
" = {retryDuration}, log_output = {log_command_output}".format(
taskId=taskId,
retryAble=retry_able,
retryDuration=retry_duration,
log_command_output=log_command_output,
)
)
self.cancelEvent.clear()
# for case of command reschedule (e.g. command and cancel for the same taskId are send at the same time)
self.taskIdsToCancel.discard(taskId)
while retry_duration >= 0:
if taskId in self.taskIdsToCancel:
logger.info(f"Command with taskId = {taskId} canceled")
command_canceled = True
self.taskIdsToCancel.discard(taskId)
break
num_attempts += 1
start = 0
if retry_able:
start = int(time.time())
# running command
command_result = self.customServiceOrchestrator.runCommand(
command,
in_progress_status["tmpout"],
in_progress_status["tmperr"],
override_output_files=num_attempts == 1,
retry=num_attempts > 1,
)
end = 1
if retry_able:
end = int(time.time())
retry_duration -= end - start
# dumping results
if command_type == AgentCommand.background_execution:
logger.info(
"Command is background command, quit retrying. Exit code: {exitCode}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}".format(
cid=taskId,
exitCode=command_result["exitcode"],
retryAble=retry_able,
retryDuration=retry_duration,
delay=delay,
)
)
return
else:
if command_result["exitcode"] == 0:
status = CommandStatus.completed
else:
status = CommandStatus.failed
if (command_result["exitcode"] == -signal.SIGTERM) or (
command_result["exitcode"] == -signal.SIGKILL
):
logger.info(f"Command with taskId = {taskId} was canceled!")
command_canceled = True
self.taskIdsToCancel.discard(taskId)
break
if status != CommandStatus.completed and retry_able and retry_duration > 0:
delay = self.get_retry_delay(delay)
if delay > retry_duration:
delay = retry_duration
retry_duration -= delay # allow one last attempt
command_result["stderr"] += (
"\n\nCommand failed. Retrying command execution ...\n\n"
)
logger.info(f"Retrying command with taskId = {taskId} after a wait of {delay}")
if "agentLevelParams" not in command:
command["agentLevelParams"] = {}
command["agentLevelParams"]["commandBeingRetried"] = "true"
self.cancelEvent.wait(delay) # wake up if something was canceled
continue
else:
logger.info(
"Quit retrying for command with taskId = {cid}. Status: {status}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}".format(
cid=taskId,
status=status,
retryAble=retry_able,
retryDuration=retry_duration,
delay=delay,
)
)
break
self.taskIdsToCancel.discard(taskId)
# do not fail task which was rescheduled from server
if command_canceled:
with self.lock, self.commandQueue.mutex:
for com in self.commandQueue.queue:
if com["taskId"] == command["taskId"]:
logger.info(
"Command with taskId = {cid} was rescheduled by server. "
"Fail report on cancelled command won't be sent with heartbeat.".format(
cid=taskId
)
)
self.commandStatuses.delete_command_data(command["taskId"])
return
# final result to stdout
command_result["stdout"] += (
"\n\nCommand completed successfully!\n"
if status == CommandStatus.completed
else "\n\nCommand failed after " + str(num_attempts) + " tries\n"
)
logger.info(
f"Command with taskId = {taskId} completed successfully!"
if status == CommandStatus.completed
else f"Command with taskId = {taskId} failed after {num_attempts} tries"
)
role_result = self.commandStatuses.generate_report_template(command)
role_result.update(
{
"stdout": command_result["stdout"],
"stderr": command_result["stderr"],
"exitCode": command_result["exitcode"],
"status": status,
}
)
if (
self.config.has_option("logging", "log_command_executes")
and int(self.config.get("logging", "log_command_executes")) == 1
and log_command_output
):
if role_result["stdout"] != "":
logger.info(
"Begin command output log for command with id = "
+ str(command["taskId"])
+ ", role = "
+ command["role"]
+ ", roleCommand = "
+ command["roleCommand"]
)
self.log_command_output(role_result["stdout"], str(command["taskId"]))
logger.info(
"End command output log for command with id = "
+ str(command["taskId"])
+ ", role = "
+ command["role"]
+ ", roleCommand = "
+ command["roleCommand"]
)
if role_result["stderr"] != "":
logger.info(
"Begin command stderr log for command with id = "
+ str(command["taskId"])
+ ", role = "
+ command["role"]
+ ", roleCommand = "
+ command["roleCommand"]
)
self.log_command_output(role_result["stderr"], str(command["taskId"]))
logger.info(
"End command stderr log for command with id = "
+ str(command["taskId"])
+ ", role = "
+ command["role"]
+ ", roleCommand = "
+ command["roleCommand"]
)
if role_result["stdout"] == "":
role_result["stdout"] = "None"
if role_result["stderr"] == "":
role_result["stderr"] = "None"
# let ambari know name of custom command
if "commandParams" in command and "custom_command" in command["commandParams"]:
role_result["customCommand"] = command["commandParams"]["custom_command"]
if "structuredOut" in command_result:
role_result["structuredOut"] = str(json.dumps(command_result["structuredOut"]))
else:
role_result["structuredOut"] = ""
self.recovery_manager.process_execution_command_result(command, status)
self.commandStatuses.put_command_status(command, role_result)
cluster_id = str(command["clusterId"])
if cluster_id != "-1" and cluster_id != "null":
service_name = command["serviceName"]
if service_name != "null":
component_name = command["role"]
self.component_status_executor.check_component_status(
cluster_id, service_name, component_name, "STATUS", report=True
)