contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-MySQL-CREATE.ddl (351 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ /* Deployment script for HadoopMetrics */ delimiter ; # CREATE DATABASE `ambarimetrics` /*!40100 DEFAULT CHARACTER SET utf8 */; # # CREATE USER 'ambari' IDENTIFIED BY 'bigdata'; # USE @schema; CREATE TABLE CompletedJob (ClusterNodeID INTEGER NOT NULL, TagSetID INTEGER NOT NULL, MapProgressPercent INTEGER NOT NULL, CleanupProgressPercent INTEGER NOT NULL, SetupProgressPercent INTEGER NOT NULL, ReduceProgressPercent INTEGER NOT NULL, RunState INTEGER NOT NULL, StartTime DATETIME NOT NULL, EndTime DATETIME NOT NULL, PRIMARY KEY(ClusterNodeID, TagSetID)); create index IX_CompletedJob_EndTime on CompletedJob(EndTime); create index IX_CompletedJob_TagSetID on CompletedJob(TagSetID); CREATE TABLE Configuration (RequestedRefreshRate INTEGER NOT NULL); CREATE TABLE DatabaseVersion (Major INTEGER NOT NULL, Minor INTEGER NOT NULL, Build INTEGER NOT NULL, Revision INTEGER NOT NULL); CREATE TABLE MetricName (MetricID INTEGER NOT NULL AUTO_INCREMENT, Name VARCHAR(255), PRIMARY KEY(MetricID)); create index IX_MetricName_Name on MetricName(Name); CREATE TABLE MetricPair (RecordID BIGINT NOT NULL, MetricID INTEGER NOT NULL, MetricValue VARCHAR(512) NOT NULL, PRIMARY KEY(RecordID, MetricID)); CREATE TABLE MetricRecord (RecordID BIGINT NOT NULL AUTO_INCREMENT, RecordTypeID INTEGER NOT NULL, NodeID INTEGER NOT NULL, SourceIP VARCHAR(255), ClusterNodeID INTEGER NOT NULL, ServiceID INTEGER NOT NULL, TagSetID INTEGER NOT NULL, RecordTimestamp BIGINT NOT NULL, RecordDate DATETIME, PRIMARY KEY(RecordID)); create index IX_MetricRecord_ClusterNodeID on MetricRecord(ClusterNodeID); create index IX_MetricRecord_NodeID_RecordID on MetricRecord(RecordID); create index IX_MetricRecord_NodeID_RecordTypeID_ClusterNodeID on MetricRecord(NodeID, RecordTypeID, ClusterNodeID); create index IX_MetricRecord_NodeID_TagSetID on MetricRecord(TagSetID); CREATE TABLE Service (ServiceID BIGINT NOT NULL AUTO_INCREMENT, Name VARCHAR(255), PRIMARY KEY(ServiceID)); CREATE TABLE Node (NodeID INTEGER NOT NULL AUTO_INCREMENT, Name VARCHAR(255), LastKnownIP VARCHAR(255), LastNameNodeHeartBeat DATETIME, LastJobTrackerHeartBeat DATETIME, LastDataNodeHeartBeat DATETIME, LastTaskTrackerHeartBeat DATETIME, PRIMARY KEY(NodeID)); create index IX_Node_Name on Node(Name); CREATE TABLE RecordType (RecordTypeID INTEGER NOT NULL AUTO_INCREMENT, Name VARCHAR(255), Context VARCHAR(255), PRIMARY KEY(RecordTypeID)); create index IX_RecordType_Context_Name on RecordType(Context, Name); CREATE TABLE TagSet (TagSetID INTEGER NOT NULL AUTO_INCREMENT, TagPairs VARCHAR(512), PRIMARY KEY(TagSetID)); create index IX_TagSet_TagPairs on TagSet(TagPairs); ALTER TABLE CompletedJob ADD CONSTRAINT FK_CompletedJob_TagSet_TagSetID FOREIGN KEY (TagSetID) REFERENCES TagSet (TagSetID) ON DELETE NO ACTION ON UPDATE NO ACTION; ALTER TABLE MetricPair ADD CONSTRAINT FK_MetricPair_MetricName_MetricID FOREIGN KEY (MetricID) REFERENCES MetricName (MetricID) ON DELETE NO ACTION ON UPDATE NO ACTION; ALTER TABLE MetricPair ADD CONSTRAINT FK_MetricPair_MetricRecord_RecordID FOREIGN KEY (RecordID) REFERENCES MetricRecord (RecordID) ON DELETE NO ACTION ON UPDATE NO ACTION; ALTER TABLE MetricRecord ADD CONSTRAINT FK_MetricRecord_Node_NodeID FOREIGN KEY (NodeID) REFERENCES Node (NodeID) ON DELETE NO ACTION ON UPDATE NO ACTION; ALTER TABLE MetricRecord ADD CONSTRAINT FK_MetricRecord_RecordType_RecordTypeID FOREIGN KEY (RecordTypeID) REFERENCES RecordType (RecordTypeID) ON DELETE NO ACTION ON UPDATE NO ACTION; ALTER TABLE MetricRecord ADD CONSTRAINT FK_MetricRecord_TagSet_TagSetID FOREIGN KEY (TagSetID) REFERENCES TagSet (TagSetID) ON DELETE NO ACTION ON UPDATE NO ACTION; DROP procedure IF EXISTS `uspInsertMetricValue`; DELIMITER $$ CREATE DEFINER=`ambari`@`%` PROCEDURE `uspInsertMetricValue`(recordID bigint, metricName nvarchar(256), metricValue nvarchar(512)) proc_label:BEGIN DECLARE metricID INT; DECLARE has_error INT DEFAULT 0; DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET has_error = 1; IF (recordID IS NULL OR metricName IS NULL) THEN LEAVE proc_label; END IF; START TRANSACTION; SELECT MetricID FROM MetricName WHERE `Name` = metricName INTO metricID; IF (metricID IS NULL) THEN INSERT INTO MetricName (`Name`) VALUES (metricName); SELECT LAST_INSERT_ID() INTO metricID; IF (has_error = 1) THEN ROLLBACK; LEAVE proc_label; END IF; END IF; COMMIT; INSERT INTO MetricPair (RecordID, MetricID, MetricValue) VALUES (recordID, metricID, metricValue); LEAVE proc_label; END; $$ DELIMITER ; DROP procedure IF EXISTS `uspUpdateHeartBeats`; DELIMITER $$ CREATE PROCEDURE uspUpdateHeartBeats(NodeID int, SourceIP varchar(256), NameNodeLast datetime, JobTrackerLast datetime, DataNodeLast datetime, TaskTrackerLast datetime, LastKnownIP varchar(256)) BEGIN IF (NodeID IS NOT NULL) THEN IF (NameNodeLast IS NOT NULL) THEN UPDATE Node as n SET n.LastNameNodeHeartBeat = NameNodeLast WHERE n.NodeID = NodeID; END IF; IF (JobTrackerLast IS NOT NULL) THEN UPDATE Node as n SET n.LastJobTrackerHeartBeat = JobTrackerLast WHERE n.NodeID = NodeID; END IF; IF (DataNodeLast IS NOT NULL) THEN UPDATE Node as n SET n.LastDataNodeHeartBeat = DataNodeLast WHERE n.NodeID = NodeID; END IF; IF (TaskTrackerLast IS NOT NULL) THEN UPDATE Node as n SET n.LastTaskTrackerHeartBeat = TaskTrackerLast WHERE n.NodeID = NodeID; END IF; IF (LastKnownIP IS NULL OR SourceIP <> LastKnownIP) THEN UPDATE Node as n SET n.LastKnownIP = SourceIP WHERE n.NodeID = NodeID; END IF; END IF; END; $$ DELIMITER ; DROP procedure IF EXISTS `uspGetMetricRecord`; DELIMITER $$ CREATE DEFINER=`ambari`@`%` PROCEDURE `uspGetMetricRecord`( recordTypeContext varchar(255), recordTypeName varchar(255), nodeName varchar(255), sourceIP varchar(255), clusterNodeName varchar(255), serviceName varchar(255), tagPairs varchar(512), recordTimestamp bigint, OUT metricRecordID bigint) proc_label: BEGIN DECLARE recordTypeID int; DECLARE nodeID int; DECLARE clusterNodeID int; DECLARE tagSetID int; DECLARE serviceID int; DECLARE err int default 0; DECLARE recordIDCutoff bigint; DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET err = 1; START TRANSACTION; SELECT max(r.RecordTypeID) INTO recordTypeID FROM RecordType as r WHERE r.`Context` = recordTypeContext AND r.`Name` = recordTypeName; IF (recordTypeID IS NULL OR recordTypeID = 0) THEN INSERT INTO RecordType (`Context`, `Name`) VALUES (recordTypeContext, recordTypeName); SELECT LAST_INSERT_ID() INTO recordTypeID; IF (err <> 0) THEN ROLLBACK; SET metricRecordID = NULL; LEAVE proc_label; END IF; END IF; COMMIT; START TRANSACTION; SELECT max(s.serviceID) INTO serviceID FROM Service as s WHERE s.`Name` = serviceName; IF (serviceID IS NULL OR serviceID = 0) THEN INSERT INTO Service (`Name`) VALUES (serviceName); SELECT LAST_INSERT_ID() INTO serviceID; IF (err <> 0) THEN ROLLBACK; SET metricRecordID = NULL; LEAVE proc_label; END IF; END IF; COMMIT; START TRANSACTION; SELECT max(n.NodeID) INTO nodeID FROM Node as n WHERE n.`Name` = nodeName; IF (nodeID IS NULL OR nodeID = 0) THEN /* Start with a node type of uninitialized. HealthNode will determine node type based on metrics delivered over time. */ INSERT INTO Node (`Name`, LastKnownIP) VALUES (nodeName, sourceIP); SELECT LAST_INSERT_ID() INTO nodeID; IF (err <> 0) THEN ROLLBACK; SET metricRecordID = NULL; LEAVE proc_label; END IF; END IF; COMMIT; -- Do our best to determine the cluster node ID based on completely flakey input from user which might be an IP address, a non-FQDN, -- or an FQDN. Note that worker nodes may have a completely different idea about the name of the namenode (which is the node -- which represents the cluster) compared with the namenode itself START TRANSACTION; IF ((SELECT ufnIsIPAddress(clusterNodeName)) = 1) THEN SELECT n.NodeID from Node as n WHERE n.LastKnownIP = clusterNodeName ORDER BY n.LastNameNodeHeartBeat DESC limit 1 INTO clusterNodeID; IF (clusterNodeID IS NULL) THEN INSERT INTO Node (`Name`, LastKnownIP) VALUES (clusterNodeName, sourceIP); SELECT LAST_INSERT_ID() INTO clusterNodeID; IF (err <> 0) THEN ROLLBACK; SET metricRecordID = NULL; LEAVE proc_label; END IF; END IF; ELSEIF ((SELECT LOCATE('.', clusterNodeName, 1)) > 0) THEN -- IF this is not an IP address, but there is a dot in the name we assume we are looking at an FQDN SELECT max(n.NodeID) FROM Node as n WHERE n.`Name` = clusterNodeName INTO clusterNodeID; IF (clusterNodeID IS NULL OR clusterNodeID = 0) THEN INSERT INTO Node (`Name`, LastKnownIP) VALUES (clusterNodeName, sourceIP); SELECT LAST_INSERT_ID() INTO clusterNodeID; IF (err <> 0) THEN ROLLBACK; SET metricRecordID = NULL; LEAVE proc_label; END IF; END IF; ELSE BEGIN -- We have got a non-FQDN, but the NameNode might know its FQDN, so be careful! We must prefer the FQDN if we can find one. -- Sadly, yes, this could break things if we are monitoring clusters from different domains. This is now by design! SELECT n.NodeID FROM Node as n WHERE n.`Name` LIKE CONCAT(clusterNodeName, '%') ORDER BY n.LastNameNodeHeartBeat DESC limit 1 INTO clusterNodeID; IF (clusterNodeID IS NULL) THEN SELECT n.NodeID FROM Node as n WHERE n.`Name` = clusterNodeName INTO clusterNodeID; if (clusterNodeID IS NULL) THEN INSERT INTO Node (`Name`, LastKnownIP) VALUES (clusterNodeName, sourceIP); SELECT LAST_INSERT_ID() INTO clusterNodeID; IF (err <> 0) THEN ROLLBACK; SET metricRecordID = NULL; LEAVE proc_label; END IF; END IF; END IF; END; END IF; COMMIT; -- Cleanup older metric records and pairs if necessary -- Policy is to keep between 60000 and 90000 metric records and associated metric pairs per node. IF ((SELECT COUNT(*) FROM MetricRecord as mr WHERE mr.NodeID = nodeID) > 90000) THEN SELECT MIN(mr.RecordID) FROM MetricRecord as mr WHERE mr.RecordID IN (select * from (SELECT r.RecordID FROM MetricRecord as r WHERE r.NodeID = nodeID ORDER BY r.RecordDate DESC limit 60000) as records) INTO recordIDCutoff; IF (recordIDCutoff IS NOT NULL) THEN DELETE FROM MetricPair WHERE RecordID IN ( SELECT RecordID FROM MetricPair as mp JOIN MetricRecord as mr ON mp.RecordID = mr.RecordID WHERE mr.RecordID < @recordIDCutoff AND mr.NodeID = @nodeID); DELETE FROM MetricRecord WHERE RecordID < recordIDCutoff AND NodeID = nodeID; END IF; END IF; START TRANSACTION; SELECT max(t.TagSetID) FROM TagSet as t WHERE t.TagPairs = tagPairs INTO tagSetID; IF (tagSetID IS NULL OR tagSetID = 0) THEN INSERT INTO TagSet (TagPairs) VALUES (tagPairs); SELECT LAST_INSERT_ID() INTO tagSetID; IF (err <> 0) THEN ROLLBACK; SET metricRecordID = NULL; LEAVE proc_label; END IF; END IF; COMMIT; START TRANSACTION; SELECT max(mr.RecordID) FROM MetricRecord as mr WHERE mr.RecordTypeID = recordTypeID AND mr.NodeID = nodeID AND mr.ServiceID = serviceID AND mr.TagSetID = tagSetID AND mr.RecordTimestamp = recordTimestamp INTO metricRecordID; IF (metricRecordID IS NULL OR metricRecordID = 0) THEN -- insert into temp_log values (CONCAT(metricRecordID, ', ', RecordTimestamp)); INSERT INTO MetricRecord (RecordTypeID, NodeID, SourceIP, ClusterNodeID, ServiceID, TagSetID, RecordTimestamp) VALUES (recordTypeID, nodeID, sourceIP, clusterNodeID, serviceID, tagSetID, recordTimestamp); SELECT LAST_INSERT_ID() INTO metricRecordID; IF (err <> 0) THEN ROLLBACK; SET metricRecordID = NULL; LEAVE proc_label; END IF; END IF; COMMIT; END; $$ DELIMITER ; DELIMITER $$ CREATE DEFINER=`ambari`@`%` FUNCTION `ufnIsIPAddress`(inputString varchar(1024)) RETURNS tinyint(4) BEGIN DECLARE currentPos bigint default 1; DECLARE nextPos bigint default 0; DECLARE count int default 0; if (CHAR_LENGTH(inputString) = 0) THEN RETURN 0; END IF; SELECT LOCATE('.', inputString, currentPos) INTO nextPos; while_label: WHILE (nextPos < CHAR_LENGTH(inputString) AND count < 4) DO IF (nextPos = 0) THEN SET nextPos = CHAR_LENGTH(inputString); END IF; IF ((SELECT SUBSTRING(inputString, currentPos, nextPos - currentPos) REGEXP '[0-9]+') = 1) THEN SET count = count + 1; SET currentPos = nextPos; SELECT LOCATE(inputString, '.', currentPos + 1) INTO nextPos; ELSE LEAVE while_label; END IF; END WHILE; IF (count = 4) THEN RETURN 1; END IF; SET currentPos = 1; SET nextPos = 0; SET count = 0; WHILE (currentPos <= CHAR_LENGTH(inputString)) DO IF ((SELECT SUBSTRING(inputString, currentPos, 1) REGEXP '[0-9A-Fa-f:]') = 1) THEN IF (SUBSTRING(inputString, currentPos, 1) = ':') THEN SET count = count + 1; END IF; SET currentPos = currentPos + 1; ELSE RETURN 0; END IF; END WHILE; IF (count >= 4) THEN RETURN 1; END IF; RETURN 0; END; $$ DELIMITER ; DROP procedure IF EXISTS `uspPurgeMetrics`; DELIMITER $$ CREATE PROCEDURE uspPurgeMetrics(noOfDays bigint) proc_label: BEGIN DECLARE recordIDCutOff BIGINT; DECLARE has_error INT default 0; DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET has_error = 1; IF (noOfDays IS NULL OR noOfDays < 1) THEN LEAVE proc_label; END IF; SELECT MAX(RecordID) FROM MetricRecord WHERE DateDiff(RecordDate, CURRENT_TIMESTAMP) >= noOfDays INTO recordIDCutoff; IF (recordIDCutoff IS NOT NULL) THEN START TRANSACTION; DELETE FROM MetricPair WHERE RecordID <= recordIDCutoff; DELETE FROM MetricRecord WHERE RecordID <= recordIDCutoff; IF (has_error <> 0) then ROLLBACK; LEAVE proc_label; END IF; COMMIT; END IF; END; $$ DELIMITER ; DROP procedure IF EXISTS `ufGetMetrics`; DELIMITER $$ CREATE DEFINER=`ambari`@`%` PROCEDURE `ufGetMetrics`(startTimeStamp bigint, endTimeStamp bigint, recordTypeContext VARCHAR(256), recordTypeName VARCHAR(256), metricName VARCHAR(256), serviceComponentName VARCHAR(256), nodeName VARCHAR(256) ) BEGIN SELECT * from ( SELECT s.RecordTimeStamp AS RecordTimeStamp, mp.MetricValue AS MetricValue FROM MetricPair mp INNER JOIN (SELECT mr.RecordID AS RecordID, mr.RecordTimeStamp AS RecordTimeStamp FROM MetricRecord mr INNER JOIN RecordType rt ON (mr.RecordTypeId = rt.RecordTypeId) INNER JOIN Node nd ON (mr.NodeID = nd.NodeID) INNER JOIN Service sr ON (mr.ServiceID = sr.ServiceID) WHERE rt.Context = recordTypeContext AND rt.Name = recordTypeName AND (nd.Name = nodeName) AND (sr.Name = serviceComponentName) AND mr.RecordTimestamp >= startTimeStamp AND mr.RecordTimestamp <= endTimeStamp ) s ON (mp.RecordID = s.RecordID) INNER JOIN MetricName mn ON (mp.MetricID = mn.MetricID) WHERE (mn.Name = metricName) ) as mp_table; END; $$ DELIMITER ; DROP procedure IF EXISTS `ufGetAggregatedServiceMetrics`; DELIMITER $$ CREATE PROCEDURE ufGetAggregatedServiceMetrics( startTimeStamp bigint, endTimeStamp bigint, recordTypeContext NVARCHAR(256), recordTypeName NVARCHAR(256), metricName NVARCHAR(256), serviceComponentName NVARCHAR(256), period integer ) BEGIN SELECT * FROM ( SELECT FLOOR ((mr.RecordTimeStamp - startTimeStamp) / period) TimeStampBlock, MAX(mr.RecordTimeStamp) RecordTimeStamp, SUM(CAST(MetricValue AS DECIMAL(18,4))) AggMetricValue FROM MetricPair mp INNER JOIN MetricRecord mr ON (mp.RecordID = mr.RecordID) INNER JOIN RecordType rt ON (rt.RecordTypeID = mr.RecordTypeID) INNER JOIN MetricName mn ON (mn.MetricID = mp.MetricID) INNER JOIN Service sr ON (sr.ServiceID = mr.ServiceID) WHERE mr.RecordTimestamp >= startTimeStamp AND mr.RecordTimestamp <= endTimeStamp AND mn.Name = metricName AND rt.Context = recordTypeContext AND rt.Name = recordTypeName AND sr.Name = serviceComponentName GROUP BY FLOOR ((mr.RecordTimeStamp - startTimeStamp) / period) ) as mp_table; END; $$ DELIMITER ;