in contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/services/stack_advisor.py [0:0]
def validateAmsHbaseSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
ams_site = getSiteProperties(configurations, "ams-site")
core_site = getSiteProperties(configurations, "core-site")
collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts)
recommendedDiskSpace = 10485760
# TODO validate configuration for multiple AMBARI_METRICS collectors
if len(amsCollectorHosts) > 1:
pass
else:
if total_sinks_count > 2000:
recommendedDiskSpace = 104857600 # * 1k == 100 Gb
elif total_sinks_count > 500:
recommendedDiskSpace = 52428800 # * 1k == 50 Gb
elif total_sinks_count > 250:
recommendedDiskSpace = 20971520 # * 1k == 20 Gb
validationItems = []
rootdir_item = None
op_mode = ams_site.get("timeline.metrics.service.operation.mode")
default_fs = core_site.get("fs.defaultFS") if core_site else "file:///"
hbase_rootdir = properties.get("hbase.rootdir")
hbase_tmpdir = properties.get("hbase.tmp.dir")
distributed = properties.get("hbase.cluster.distributed")
is_local_root_dir = hbase_rootdir.startswith("file://") or (default_fs.startswith("file://") and hbase_rootdir.startswith("/"))
if op_mode == "distributed" and is_local_root_dir:
rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS.")
elif op_mode == "embedded":
if distributed.lower() == "false" and hbase_rootdir.startswith('/') or hbase_rootdir.startswith("hdfs://"):
rootdir_item = self.getWarnItem("In embedded mode hbase.rootdir cannot point to schemaless values or HDFS, "
"Example - file:// for localFS")
pass
distributed_item = None
if op_mode == "distributed" and not distributed.lower() == "true":
distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to true for "
"distributed mode")
if op_mode == "embedded" and distributed.lower() == "true":
distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to false for embedded mode")
hbase_zk_client_port = properties.get("hbase.zookeeper.property.clientPort")
zkPort = self.getZKPort(services)
hbase_zk_client_port_item = None
if distributed.lower() == "true" and op_mode == "distributed" and \
hbase_zk_client_port != zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}":
hbase_zk_client_port_item = self.getErrorItem("In AMS distributed mode, hbase.zookeeper.property.clientPort "
"should be the cluster zookeeper server port : {0}".format(zkPort))
if distributed.lower() == "false" and op_mode == "embedded" and \
hbase_zk_client_port == zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}":
hbase_zk_client_port_item = self.getErrorItem("In AMS embedded mode, hbase.zookeeper.property.clientPort "
"should be a different port than cluster zookeeper port."
"(default:61181)")
validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item },
{"config-name":'hbase.cluster.distributed', "item": distributed_item },
{"config-name":'hbase.zookeeper.property.clientPort', "item": hbase_zk_client_port_item }])
for collectorHostName in amsCollectorHosts:
for host in hosts["items"]:
if host["Hosts"]["host_name"] == collectorHostName:
if op_mode == 'embedded' or is_local_root_dir:
validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}])
validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.rootdir', host["Hosts"])}])
validationItems.extend([{"config-name": 'hbase.tmp.dir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.tmp.dir', host["Hosts"])}])
dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE")
if is_local_root_dir:
mountPoints = []
for mountPoint in host["Hosts"]["disk_info"]:
mountPoints.append(mountPoint["mountpoint"])
hbase_rootdir_mountpoint = getMountPointForDir(hbase_rootdir, mountPoints)
hbase_tmpdir_mountpoint = getMountPointForDir(hbase_tmpdir, mountPoints)
preferred_mountpoints = self.getPreferredMountPoints(host['Hosts'])
# hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition
# if multiple preferred_mountpoints exist
if hbase_rootdir_mountpoint == hbase_tmpdir_mountpoint and \
len(preferred_mountpoints) > 1:
item = self.getWarnItem("Consider not using {0} partition for storing metrics temporary data. "
"{0} partition is already used as hbase.rootdir to store metrics data".format(hbase_tmpdir_mountpoint))
validationItems.extend([{"config-name":'hbase.tmp.dir', "item": item}])
# if METRICS_COLLECTOR is co-hosted with DATANODE
# cross-check dfs.datanode.data.dir and hbase.rootdir
# they shouldn't share same disk partition IO
hdfs_site = getSiteProperties(configurations, "hdfs-site")
dfs_datadirs = hdfs_site.get("dfs.datanode.data.dir").split(",") if hdfs_site and "dfs.datanode.data.dir" in hdfs_site else []
if dn_hosts and collectorHostName in dn_hosts and ams_site and \
dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs):
for dfs_datadir in dfs_datadirs:
dfs_datadir_mountpoint = getMountPointForDir(dfs_datadir, mountPoints)
if dfs_datadir_mountpoint == hbase_rootdir_mountpoint:
item = self.getWarnItem("Consider not using {0} partition for storing metrics data. "
"{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint))
validationItems.extend([{"config-name": 'hbase.rootdir', "item": item}])
break
# If no local DN in distributed mode
elif collectorHostName not in dn_hosts and distributed.lower() == "true":
item = self.getWarnItem("It's recommended to install Datanode component on {0} "
"to speed up IO operations between HDFS and Metrics "
"Collector in distributed mode ".format(collectorHostName))
validationItems.extend([{"config-name": "hbase.cluster.distributed", "item": item}])
# Short circuit read should be enabled in distibuted mode
# if local DN installed
else:
validationItems.extend([{"config-name": "dfs.client.read.shortcircuit", "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, "dfs.client.read.shortcircuit")}])
return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site")