in ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/HDFS/package/alerts/alert_metrics_deviation.py [0:0]
def execute(configurations={}, parameters={}, host_name=None):
"""
Returns a tuple containing the result code and a pre-formatted result label
Keyword arguments:
configurations : a mapping of configuration key to value
parameters : a mapping of script parameter key to value
host_name : the name of this host where the alert is running
:type configurations dict
:type parameters dict
:type host_name str
"""
hostnames = host_name
current_time = int(time.time()) * 1000
# parse script arguments
connection_timeout = CONNECTION_TIMEOUT_DEFAULT
if CONNECTION_TIMEOUT_KEY in parameters:
connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY])
merge_ha_metrics = MERGE_HA_METRICS_PARAM_DEFAULT
if MERGE_HA_METRICS_PARAM_KEY in parameters:
merge_ha_metrics = parameters[MERGE_HA_METRICS_PARAM_KEY].lower() == "true"
metric_name = METRIC_NAME_PARAM_DEFAULT
if METRIC_NAME_PARAM_KEY in parameters:
metric_name = parameters[METRIC_NAME_PARAM_KEY]
metric_units = METRIC_UNITS_DEFAULT
if METRIC_UNITS_PARAM_KEY in parameters:
metric_units = parameters[METRIC_UNITS_PARAM_KEY]
app_id = APP_ID_PARAM_DEFAULT
if APP_ID_PARAM_KEY in parameters:
app_id = parameters[APP_ID_PARAM_KEY]
interval = INTERVAL_PARAM_DEFAULT
if INTERVAL_PARAM_KEY in parameters:
interval = _coerce_to_integer(parameters[INTERVAL_PARAM_KEY])
warning_threshold = DEVIATION_WARNING_THRESHOLD_DEFAULT
if DEVIATION_WARNING_THRESHOLD_KEY in parameters:
warning_threshold = _coerce_to_integer(parameters[DEVIATION_WARNING_THRESHOLD_KEY])
critical_threshold = DEVIATION_CRITICAL_THRESHOLD_DEFAULT
if DEVIATION_CRITICAL_THRESHOLD_KEY in parameters:
critical_threshold = _coerce_to_integer(
parameters[DEVIATION_CRITICAL_THRESHOLD_KEY]
)
minimum_value_threshold = None
if MINIMUM_VALUE_THRESHOLD_KEY in parameters:
minimum_value_threshold = _coerce_to_integer(
parameters[MINIMUM_VALUE_THRESHOLD_KEY]
)
# parse configuration
if configurations is None:
return (
RESULT_STATE_UNKNOWN,
["There were no configurations supplied to the script."],
)
# hdfs-site is required
if not HDFS_SITE_KEY in configurations:
return (
RESULT_STATE_UNKNOWN,
[f"{HDFS_SITE_KEY} is a required parameter for the script"],
)
if (
METRICS_COLLECTOR_VIP_HOST_KEY in configurations
and METRICS_COLLECTOR_VIP_PORT_KEY in configurations
):
collector_host = configurations[METRICS_COLLECTOR_VIP_HOST_KEY].split(",")[0]
collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY])
else:
# ams-site/timeline.metrics.service.webapp.address is required
if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations:
return (
RESULT_STATE_UNKNOWN,
[
f"{METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY} is a required parameter for the script"
],
)
else:
collector_webapp_address = configurations[
METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY
].split(":")
if valid_collector_webapp_address(collector_webapp_address):
collector_host = select_metric_collector_for_sink(app_id.lower())
collector_port = int(collector_webapp_address[1])
else:
return (
RESULT_STATE_UNKNOWN,
[
'{0} value should be set as "fqdn_hostname:port", but set to {1}'.format(
METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY,
configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY],
)
],
)
namenode_service_rpc_address = None
# hdfs-site is required
if not HDFS_SITE_KEY in configurations:
return (
RESULT_STATE_UNKNOWN,
[f"{HDFS_SITE_KEY} is a required parameter for the script"],
)
hdfs_site = configurations[HDFS_SITE_KEY]
if "dfs.namenode.servicerpc-address" in hdfs_site:
namenode_service_rpc_address = hdfs_site["dfs.namenode.servicerpc-address"]
# if namenode alert and HA mode
if NAMESERVICE_KEY in configurations and app_id.lower() == "namenode":
# hdfs-site is required
if not HDFS_SITE_KEY in configurations:
return (
RESULT_STATE_UNKNOWN,
[f"{HDFS_SITE_KEY} is a required parameter for the script"],
)
if SMOKEUSER_KEY in configurations:
smokeuser = configurations[SMOKEUSER_KEY]
executable_paths = None
if EXECUTABLE_SEARCH_PATHS in configurations:
executable_paths = configurations[EXECUTABLE_SEARCH_PATHS]
# parse script arguments
security_enabled = False
if SECURITY_ENABLED_KEY in configurations:
security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == "TRUE"
kerberos_keytab = None
if KERBEROS_KEYTAB in configurations:
kerberos_keytab = configurations[KERBEROS_KEYTAB]
kerberos_principal = None
if KERBEROS_PRINCIPAL in configurations:
kerberos_principal = configurations[KERBEROS_PRINCIPAL]
kerberos_principal = kerberos_principal.replace("_HOST", host_name)
# determine whether or not SSL is enabled
is_ssl_enabled = False
if DFS_POLICY_KEY in configurations:
dfs_policy = configurations[DFS_POLICY_KEY]
if dfs_policy == "HTTPS_ONLY":
is_ssl_enabled = True
kinit_timer_ms = parameters.get(
KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS
)
name_service = get_name_service_by_hostname(hdfs_site, host_name)
# look for dfs.ha.namenodes.foo
nn_unique_ids_key = "dfs.ha.namenodes." + name_service
if not nn_unique_ids_key in hdfs_site:
return (
RESULT_STATE_UNKNOWN,
[f"Unable to find unique NameNode alias key {nn_unique_ids_key}"],
)
namenode_http_fragment = "dfs.namenode.http-address.{0}.{1}"
jmx_uri_fragment = "http://{0}/jmx?qry=Hadoop:service=NameNode,name=*"
if is_ssl_enabled:
namenode_http_fragment = "dfs.namenode.https-address.{0}.{1}"
jmx_uri_fragment = "https://{0}/jmx?qry=Hadoop:service=NameNode,name=*"
# now we have something like 'nn1,nn2,nn3,nn4'
# turn it into dfs.namenode.[property].[dfs.nameservices].[nn_unique_id]
# ie dfs.namenode.http-address.hacluster.nn1
namenodes = []
active_namenodes = []
nn_unique_ids = hdfs_site[nn_unique_ids_key].split(",")
for nn_unique_id in nn_unique_ids:
key = namenode_http_fragment.format(name_service, nn_unique_id)
if key in hdfs_site:
# use str() to ensure that unicode strings do not have the u' in them
value = str(hdfs_site[key])
namenode = str(hdfs_site[key]).split(":")[0]
namenodes.append(namenode)
try:
jmx_uri = jmx_uri_fragment.format(value)
if (
kerberos_principal is not None
and kerberos_keytab is not None
and security_enabled
):
env = Environment.get_instance()
# curl requires an integer timeout
curl_connection_timeout = int(connection_timeout)
state_response, error_msg, time_millis = curl_krb_request(
env.tmp_dir,
kerberos_keytab,
kerberos_principal,
jmx_uri,
"ha_nn_health",
executable_paths,
False,
"NameNode High Availability Health",
smokeuser,
connection_timeout=curl_connection_timeout,
kinit_timer_ms=kinit_timer_ms,
)
state = _get_ha_state_from_json(state_response)
else:
state = _get_state_from_jmx(jmx_uri, connection_timeout)
if state == HDFS_NN_STATE_ACTIVE:
active_namenodes.append(namenode)
# Only check active NN
nn_service_rpc_address_key = (
f"dfs.namenode.servicerpc-address.{name_service}.{nn_unique_id}"
)
if nn_service_rpc_address_key in hdfs_site:
namenode_service_rpc_address = hdfs_site[nn_service_rpc_address_key]
pass
except:
logger.exception("Unable to determine the active NameNode")
pass
if merge_ha_metrics:
hostnames = ",".join(namenodes)
# run only on active NN, no need to run the same requests from the standby
if host_name not in active_namenodes:
return (RESULT_STATE_SKIPPED, ["This alert will be reported by another host."])
pass
# Skip service rpc alert if port is not enabled
if not namenode_service_rpc_address and "rpc.rpc.datanode" in metric_name:
return (RESULT_STATE_SKIPPED, ["Service RPC port is not enabled."])
get_metrics_parameters = {
"metricNames": metric_name,
"appId": app_id,
"hostname": hostnames,
"startTime": current_time - interval * 60 * 1000,
"endTime": current_time,
"grouped": "true",
}
encoded_get_metrics_parameters = urllib.parse.urlencode(get_metrics_parameters)
ams_monitor_conf_dir = "/etc/ambari-metrics-monitor/conf"
metric_truststore_ca_certs = "ca.pem"
ca_certs = os.path.join(ams_monitor_conf_dir, metric_truststore_ca_certs)
metric_collector_https_enabled = str(configurations[AMS_HTTP_POLICY]) == "HTTPS_ONLY"
_ssl_version = _get_ssl_version()
try:
conn = network.get_http_connection(
collector_host,
int(collector_port),
metric_collector_https_enabled,
ca_certs,
ssl_version=_ssl_version,
)
conn.request("GET", AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
response = conn.getresponse()
data = response.read()
conn.close()
except Exception as e:
logger.info(str(e))
return (
RESULT_STATE_UNKNOWN,
["Unable to retrieve metrics from the Ambari Metrics service."],
)
if response.status != 200:
logger.info(str(data))
return (
RESULT_STATE_UNKNOWN,
["Unable to retrieve metrics from the Ambari Metrics service."],
)
data_json = json.loads(data)
metrics = []
# will get large standard deviation for multiple hosts,
# if host1 reports small local values, but host2 reports large local values
for metrics_data in data_json["metrics"]:
metrics += metrics_data["metrics"].values()
pass
if not metrics or len(metrics) < 2:
number_of_data_points = len(metrics) if metrics else 0
return (
RESULT_STATE_SKIPPED,
[
"There are not enough data points to calculate the standard deviation ({0} sampled)".format(
number_of_data_points
)
],
)
minimum_value_multiplier = 1
if "dfs.FSNamesystem.CapacityUsed" in metric_name:
minimum_value_multiplier = 1024 * 1024 # MB to bytes
elif "rpc.rpc.datanode" in metric_name or "rpc.rpc.client" in metric_name:
minimum_value_multiplier = 1000 # seconds to millis
if minimum_value_threshold:
# Filter out points below min threshold
metrics = [
metric
for metric in metrics
if metric > (minimum_value_threshold * minimum_value_multiplier)
]
if len(metrics) < 2:
return (
RESULT_STATE_OK,
[
f"There were no data points above the minimum threshold of {minimum_value_threshold} seconds"
],
)
mean_value = mean(metrics)
stddev = sample_standard_deviation(metrics)
try:
deviation_percent = stddev / float(mean_value) * 100
except ZeroDivisionError:
# should not be a case for this alert
return (
RESULT_STATE_SKIPPED,
["Unable to calculate the standard deviation because the mean value is 0"],
)
# log the AMS request
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"""