def execute()

in ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/HDFS/package/alerts/alert_metrics_deviation.py [0:0]


def execute(configurations={}, parameters={}, host_name=None):
  """
  Returns a tuple containing the result code and a pre-formatted result label

  Keyword arguments:
  configurations : a mapping of configuration key to value
  parameters : a mapping of script parameter key to value
  host_name : the name of this host where the alert is running

  :type configurations dict
  :type parameters dict
  :type host_name str
  """
  hostnames = host_name
  current_time = int(time.time()) * 1000

  # parse script arguments
  connection_timeout = CONNECTION_TIMEOUT_DEFAULT
  if CONNECTION_TIMEOUT_KEY in parameters:
    connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY])

  merge_ha_metrics = MERGE_HA_METRICS_PARAM_DEFAULT
  if MERGE_HA_METRICS_PARAM_KEY in parameters:
    merge_ha_metrics = parameters[MERGE_HA_METRICS_PARAM_KEY].lower() == "true"

  metric_name = METRIC_NAME_PARAM_DEFAULT
  if METRIC_NAME_PARAM_KEY in parameters:
    metric_name = parameters[METRIC_NAME_PARAM_KEY]

  metric_units = METRIC_UNITS_DEFAULT
  if METRIC_UNITS_PARAM_KEY in parameters:
    metric_units = parameters[METRIC_UNITS_PARAM_KEY]

  app_id = APP_ID_PARAM_DEFAULT
  if APP_ID_PARAM_KEY in parameters:
    app_id = parameters[APP_ID_PARAM_KEY]

  interval = INTERVAL_PARAM_DEFAULT
  if INTERVAL_PARAM_KEY in parameters:
    interval = _coerce_to_integer(parameters[INTERVAL_PARAM_KEY])

  warning_threshold = DEVIATION_WARNING_THRESHOLD_DEFAULT
  if DEVIATION_WARNING_THRESHOLD_KEY in parameters:
    warning_threshold = _coerce_to_integer(parameters[DEVIATION_WARNING_THRESHOLD_KEY])

  critical_threshold = DEVIATION_CRITICAL_THRESHOLD_DEFAULT
  if DEVIATION_CRITICAL_THRESHOLD_KEY in parameters:
    critical_threshold = _coerce_to_integer(
      parameters[DEVIATION_CRITICAL_THRESHOLD_KEY]
    )

  minimum_value_threshold = None
  if MINIMUM_VALUE_THRESHOLD_KEY in parameters:
    minimum_value_threshold = _coerce_to_integer(
      parameters[MINIMUM_VALUE_THRESHOLD_KEY]
    )

  # parse configuration
  if configurations is None:
    return (
      RESULT_STATE_UNKNOWN,
      ["There were no configurations supplied to the script."],
    )

  # hdfs-site is required
  if not HDFS_SITE_KEY in configurations:
    return (
      RESULT_STATE_UNKNOWN,
      [f"{HDFS_SITE_KEY} is a required parameter for the script"],
    )

  if (
    METRICS_COLLECTOR_VIP_HOST_KEY in configurations
    and METRICS_COLLECTOR_VIP_PORT_KEY in configurations
  ):
    collector_host = configurations[METRICS_COLLECTOR_VIP_HOST_KEY].split(",")[0]
    collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY])
  else:
    # ams-site/timeline.metrics.service.webapp.address is required
    if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations:
      return (
        RESULT_STATE_UNKNOWN,
        [
          f"{METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY} is a required parameter for the script"
        ],
      )
    else:
      collector_webapp_address = configurations[
        METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY
      ].split(":")
      if valid_collector_webapp_address(collector_webapp_address):
        collector_host = select_metric_collector_for_sink(app_id.lower())
        collector_port = int(collector_webapp_address[1])
      else:
        return (
          RESULT_STATE_UNKNOWN,
          [
            '{0} value should be set as "fqdn_hostname:port", but set to {1}'.format(
              METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY,
              configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY],
            )
          ],
        )

  namenode_service_rpc_address = None
  # hdfs-site is required
  if not HDFS_SITE_KEY in configurations:
    return (
      RESULT_STATE_UNKNOWN,
      [f"{HDFS_SITE_KEY} is a required parameter for the script"],
    )

  hdfs_site = configurations[HDFS_SITE_KEY]

  if "dfs.namenode.servicerpc-address" in hdfs_site:
    namenode_service_rpc_address = hdfs_site["dfs.namenode.servicerpc-address"]

  # if namenode alert and HA mode
  if NAMESERVICE_KEY in configurations and app_id.lower() == "namenode":
    # hdfs-site is required
    if not HDFS_SITE_KEY in configurations:
      return (
        RESULT_STATE_UNKNOWN,
        [f"{HDFS_SITE_KEY} is a required parameter for the script"],
      )

    if SMOKEUSER_KEY in configurations:
      smokeuser = configurations[SMOKEUSER_KEY]

    executable_paths = None
    if EXECUTABLE_SEARCH_PATHS in configurations:
      executable_paths = configurations[EXECUTABLE_SEARCH_PATHS]

    # parse script arguments
    security_enabled = False
    if SECURITY_ENABLED_KEY in configurations:
      security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == "TRUE"

    kerberos_keytab = None
    if KERBEROS_KEYTAB in configurations:
      kerberos_keytab = configurations[KERBEROS_KEYTAB]

    kerberos_principal = None
    if KERBEROS_PRINCIPAL in configurations:
      kerberos_principal = configurations[KERBEROS_PRINCIPAL]
      kerberos_principal = kerberos_principal.replace("_HOST", host_name)

    # determine whether or not SSL is enabled
    is_ssl_enabled = False
    if DFS_POLICY_KEY in configurations:
      dfs_policy = configurations[DFS_POLICY_KEY]
      if dfs_policy == "HTTPS_ONLY":
        is_ssl_enabled = True

    kinit_timer_ms = parameters.get(
      KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS
    )

    name_service = get_name_service_by_hostname(hdfs_site, host_name)

    # look for dfs.ha.namenodes.foo
    nn_unique_ids_key = "dfs.ha.namenodes." + name_service
    if not nn_unique_ids_key in hdfs_site:
      return (
        RESULT_STATE_UNKNOWN,
        [f"Unable to find unique NameNode alias key {nn_unique_ids_key}"],
      )

    namenode_http_fragment = "dfs.namenode.http-address.{0}.{1}"
    jmx_uri_fragment = "http://{0}/jmx?qry=Hadoop:service=NameNode,name=*"

    if is_ssl_enabled:
      namenode_http_fragment = "dfs.namenode.https-address.{0}.{1}"
      jmx_uri_fragment = "https://{0}/jmx?qry=Hadoop:service=NameNode,name=*"

    # now we have something like 'nn1,nn2,nn3,nn4'
    # turn it into dfs.namenode.[property].[dfs.nameservices].[nn_unique_id]
    # ie dfs.namenode.http-address.hacluster.nn1
    namenodes = []
    active_namenodes = []
    nn_unique_ids = hdfs_site[nn_unique_ids_key].split(",")
    for nn_unique_id in nn_unique_ids:
      key = namenode_http_fragment.format(name_service, nn_unique_id)

      if key in hdfs_site:
        # use str() to ensure that unicode strings do not have the u' in them
        value = str(hdfs_site[key])
        namenode = str(hdfs_site[key]).split(":")[0]

        namenodes.append(namenode)
        try:
          jmx_uri = jmx_uri_fragment.format(value)
          if (
            kerberos_principal is not None
            and kerberos_keytab is not None
            and security_enabled
          ):
            env = Environment.get_instance()

            # curl requires an integer timeout
            curl_connection_timeout = int(connection_timeout)
            state_response, error_msg, time_millis = curl_krb_request(
              env.tmp_dir,
              kerberos_keytab,
              kerberos_principal,
              jmx_uri,
              "ha_nn_health",
              executable_paths,
              False,
              "NameNode High Availability Health",
              smokeuser,
              connection_timeout=curl_connection_timeout,
              kinit_timer_ms=kinit_timer_ms,
            )

            state = _get_ha_state_from_json(state_response)
          else:
            state = _get_state_from_jmx(jmx_uri, connection_timeout)

          if state == HDFS_NN_STATE_ACTIVE:
            active_namenodes.append(namenode)

            # Only check active NN
            nn_service_rpc_address_key = (
              f"dfs.namenode.servicerpc-address.{name_service}.{nn_unique_id}"
            )
            if nn_service_rpc_address_key in hdfs_site:
              namenode_service_rpc_address = hdfs_site[nn_service_rpc_address_key]
          pass
        except:
          logger.exception("Unable to determine the active NameNode")
    pass

    if merge_ha_metrics:
      hostnames = ",".join(namenodes)
      # run only on active NN, no need to run the same requests from the standby
      if host_name not in active_namenodes:
        return (RESULT_STATE_SKIPPED, ["This alert will be reported by another host."])
    pass

  # Skip service rpc alert if port is not enabled
  if not namenode_service_rpc_address and "rpc.rpc.datanode" in metric_name:
    return (RESULT_STATE_SKIPPED, ["Service RPC port is not enabled."])

  get_metrics_parameters = {
    "metricNames": metric_name,
    "appId": app_id,
    "hostname": hostnames,
    "startTime": current_time - interval * 60 * 1000,
    "endTime": current_time,
    "grouped": "true",
  }

  encoded_get_metrics_parameters = urllib.parse.urlencode(get_metrics_parameters)

  ams_monitor_conf_dir = "/etc/ambari-metrics-monitor/conf"
  metric_truststore_ca_certs = "ca.pem"
  ca_certs = os.path.join(ams_monitor_conf_dir, metric_truststore_ca_certs)
  metric_collector_https_enabled = str(configurations[AMS_HTTP_POLICY]) == "HTTPS_ONLY"

  _ssl_version = _get_ssl_version()
  try:
    conn = network.get_http_connection(
      collector_host,
      int(collector_port),
      metric_collector_https_enabled,
      ca_certs,
      ssl_version=_ssl_version,
    )
    conn.request("GET", AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
    response = conn.getresponse()
    data = response.read()
    conn.close()
  except Exception as e:
    logger.info(str(e))
    return (
      RESULT_STATE_UNKNOWN,
      ["Unable to retrieve metrics from the Ambari Metrics service."],
    )

  if response.status != 200:
    logger.info(str(data))
    return (
      RESULT_STATE_UNKNOWN,
      ["Unable to retrieve metrics from the Ambari Metrics service."],
    )

  data_json = json.loads(data)
  metrics = []
  # will get large standard deviation for multiple hosts,
  # if host1 reports small local values, but host2 reports large local values
  for metrics_data in data_json["metrics"]:
    metrics += metrics_data["metrics"].values()
  pass

  if not metrics or len(metrics) < 2:
    number_of_data_points = len(metrics) if metrics else 0
    return (
      RESULT_STATE_SKIPPED,
      [
        "There are not enough data points to calculate the standard deviation ({0} sampled)".format(
          number_of_data_points
        )
      ],
    )

  minimum_value_multiplier = 1
  if "dfs.FSNamesystem.CapacityUsed" in metric_name:
    minimum_value_multiplier = 1024 * 1024  # MB to bytes
  elif "rpc.rpc.datanode" in metric_name or "rpc.rpc.client" in metric_name:
    minimum_value_multiplier = 1000  # seconds to millis

  if minimum_value_threshold:
    # Filter out points below min threshold
    metrics = [
      metric
      for metric in metrics
      if metric > (minimum_value_threshold * minimum_value_multiplier)
    ]
    if len(metrics) < 2:
      return (
        RESULT_STATE_OK,
        [
          f"There were no data points above the minimum threshold of {minimum_value_threshold} seconds"
        ],
      )

  mean_value = mean(metrics)
  stddev = sample_standard_deviation(metrics)

  try:
    deviation_percent = stddev / float(mean_value) * 100
  except ZeroDivisionError:
    # should not be a case for this alert
    return (
      RESULT_STATE_SKIPPED,
      ["Unable to calculate the standard deviation because the mean value is 0"],
    )

  # log the AMS request
  if logger.isEnabledFor(logging.DEBUG):
    logger.debug(f"""