def execute_command()

in ambari-agent/src/main/python/ambari_agent/ActionQueue.py [0:0]


  def execute_command(self, command):
    """
    Executes commands of type EXECUTION_COMMAND
    """
    cluster_id = command["clusterId"]
    command_id = command["commandId"]
    command_type = command["commandType"]

    num_attempts = 0
    retry_duration = 0  # even with 0 allow one attempt
    retry_able = False
    delay = 1
    log_command_output = True
    command_canceled = False
    command_result = {}

    message = (
      "Executing command with id = {commandId}, taskId = {taskId} for role = {role} of "
      "cluster_id {cluster}.".format(
        commandId=str(command_id),
        taskId=str(command["taskId"]),
        role=command["role"],
        cluster=cluster_id,
      )
    )
    logger.info(message)

    taskId = command["taskId"]
    # Preparing 'IN_PROGRESS' report
    in_progress_status = self.commandStatuses.generate_report_template(command)
    # The path of the files that contain the output log and error log use a prefix that the agent advertises to the
    # server. The prefix is defined in agent-config.ini
    if command_type != AgentCommand.auto_execution:
      in_progress_status.update(
        {
          "tmpout": self.tmpdir + os.sep + "output-" + str(taskId) + ".txt",
          "tmperr": self.tmpdir + os.sep + "errors-" + str(taskId) + ".txt",
          "structuredOut": self.tmpdir
          + os.sep
          + "structured-out-"
          + str(taskId)
          + ".json",
          "status": CommandStatus.in_progress,
        }
      )
    else:
      in_progress_status.update(
        {
          "tmpout": self.tmpdir + os.sep + "auto_output-" + str(taskId) + ".txt",
          "tmperr": self.tmpdir + os.sep + "auto_errors-" + str(taskId) + ".txt",
          "structuredOut": self.tmpdir
          + os.sep
          + "auto_structured-out-"
          + str(taskId)
          + ".json",
          "status": CommandStatus.in_progress,
        }
      )

    self.commandStatuses.put_command_status(command, in_progress_status)

    if "commandParams" in command:
      if "max_duration_for_retries" in command["commandParams"]:
        retry_duration = int(command["commandParams"]["max_duration_for_retries"])
      if (
        "command_retry_enabled" in command["commandParams"]
        and command_type != AgentCommand.auto_execution
      ):
        #  for AgentCommand.auto_execution command retry_able should be always false
        retry_able = command["commandParams"]["command_retry_enabled"] == "true"
      if "log_output" in command["commandParams"]:
        log_command_output = command["commandParams"]["log_output"] != "false"

    logger.info(
      "Command execution metadata - taskId = {taskId}, retry enabled = {retryAble}, max retry duration (sec)"
      " = {retryDuration}, log_output = {log_command_output}".format(
        taskId=taskId,
        retryAble=retry_able,
        retryDuration=retry_duration,
        log_command_output=log_command_output,
      )
    )

    self.cancelEvent.clear()
    # for case of command reschedule (e.g. command and cancel for the same taskId are send at the same time)
    self.taskIdsToCancel.discard(taskId)

    while retry_duration >= 0:
      if taskId in self.taskIdsToCancel:
        logger.info(f"Command with taskId = {taskId} canceled")
        command_canceled = True

        self.taskIdsToCancel.discard(taskId)
        break

      num_attempts += 1
      start = 0
      if retry_able:
        start = int(time.time())
      # running command
      command_result = self.customServiceOrchestrator.runCommand(
        command,
        in_progress_status["tmpout"],
        in_progress_status["tmperr"],
        override_output_files=num_attempts == 1,
        retry=num_attempts > 1,
      )
      end = 1
      if retry_able:
        end = int(time.time())
      retry_duration -= end - start

      # dumping results
      if command_type == AgentCommand.background_execution:
        logger.info(
          "Command is background command, quit retrying. Exit code: {exitCode}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}".format(
            cid=taskId,
            exitCode=command_result["exitcode"],
            retryAble=retry_able,
            retryDuration=retry_duration,
            delay=delay,
          )
        )
        return
      else:
        if command_result["exitcode"] == 0:
          status = CommandStatus.completed
        else:
          status = CommandStatus.failed
          if (command_result["exitcode"] == -signal.SIGTERM) or (
            command_result["exitcode"] == -signal.SIGKILL
          ):
            logger.info(f"Command with taskId = {taskId} was canceled!")
            command_canceled = True
            self.taskIdsToCancel.discard(taskId)
            break

      if status != CommandStatus.completed and retry_able and retry_duration > 0:
        delay = self.get_retry_delay(delay)
        if delay > retry_duration:
          delay = retry_duration
        retry_duration -= delay  # allow one last attempt
        command_result["stderr"] += (
          "\n\nCommand failed. Retrying command execution ...\n\n"
        )
        logger.info(f"Retrying command with taskId = {taskId} after a wait of {delay}")
        if "agentLevelParams" not in command:
          command["agentLevelParams"] = {}

        command["agentLevelParams"]["commandBeingRetried"] = "true"
        self.cancelEvent.wait(delay)  # wake up if something was canceled

        continue
      else:
        logger.info(
          "Quit retrying for command with taskId = {cid}. Status: {status}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}".format(
            cid=taskId,
            status=status,
            retryAble=retry_able,
            retryDuration=retry_duration,
            delay=delay,
          )
        )
        break

    self.taskIdsToCancel.discard(taskId)

    # do not fail task which was rescheduled from server
    if command_canceled:
      with self.lock, self.commandQueue.mutex:
        for com in self.commandQueue.queue:
          if com["taskId"] == command["taskId"]:
            logger.info(
              "Command with taskId = {cid} was rescheduled by server. "
              "Fail report on cancelled command won't be sent with heartbeat.".format(
                cid=taskId
              )
            )
            self.commandStatuses.delete_command_data(command["taskId"])
            return

    # final result to stdout
    command_result["stdout"] += (
      "\n\nCommand completed successfully!\n"
      if status == CommandStatus.completed
      else "\n\nCommand failed after " + str(num_attempts) + " tries\n"
    )
    logger.info(
      f"Command with taskId = {taskId} completed successfully!"
      if status == CommandStatus.completed
      else f"Command with taskId = {taskId} failed after {num_attempts} tries"
    )

    role_result = self.commandStatuses.generate_report_template(command)
    role_result.update(
      {
        "stdout": command_result["stdout"],
        "stderr": command_result["stderr"],
        "exitCode": command_result["exitcode"],
        "status": status,
      }
    )

    if (
      self.config.has_option("logging", "log_command_executes")
      and int(self.config.get("logging", "log_command_executes")) == 1
      and log_command_output
    ):
      if role_result["stdout"] != "":
        logger.info(
          "Begin command output log for command with id = "
          + str(command["taskId"])
          + ", role = "
          + command["role"]
          + ", roleCommand = "
          + command["roleCommand"]
        )
        self.log_command_output(role_result["stdout"], str(command["taskId"]))
        logger.info(
          "End command output log for command with id = "
          + str(command["taskId"])
          + ", role = "
          + command["role"]
          + ", roleCommand = "
          + command["roleCommand"]
        )

      if role_result["stderr"] != "":
        logger.info(
          "Begin command stderr log for command with id = "
          + str(command["taskId"])
          + ", role = "
          + command["role"]
          + ", roleCommand = "
          + command["roleCommand"]
        )
        self.log_command_output(role_result["stderr"], str(command["taskId"]))
        logger.info(
          "End command stderr log for command with id = "
          + str(command["taskId"])
          + ", role = "
          + command["role"]
          + ", roleCommand = "
          + command["roleCommand"]
        )

    if role_result["stdout"] == "":
      role_result["stdout"] = "None"
    if role_result["stderr"] == "":
      role_result["stderr"] = "None"

    # let ambari know name of custom command

    if "commandParams" in command and "custom_command" in command["commandParams"]:
      role_result["customCommand"] = command["commandParams"]["custom_command"]

    if "structuredOut" in command_result:
      role_result["structuredOut"] = str(json.dumps(command_result["structuredOut"]))
    else:
      role_result["structuredOut"] = ""

    self.recovery_manager.process_execution_command_result(command, status)
    self.commandStatuses.put_command_status(command, role_result)

    cluster_id = str(command["clusterId"])

    if cluster_id != "-1" and cluster_id != "null":
      service_name = command["serviceName"]
      if service_name != "null":
        component_name = command["role"]
        self.component_status_executor.check_component_status(
          cluster_id, service_name, component_name, "STATUS", report=True
        )