in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java [121:192]
public void initializeHAController() throws Exception {
// Create setup tool instance
admin = new ZKHelixAdmin(zkConnectUrl);
// Create cluster namespace in zookeeper. Don't recreate if exists.
LOG.info(String.format("Creating zookeeper cluster node: %s", CLUSTER_NAME));
boolean clusterAdded = admin.addCluster(CLUSTER_NAME, false);
LOG.info(String.format("Was cluster added successfully? %s", clusterAdded));
// Adding host to the cluster
boolean success = false;
int tries = 5;
int sleepTimeInSeconds = 5;
for (int i = 0; i < tries && !success; i++) {
try {
List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
if (!nodes.contains(instanceConfig.getInstanceName())) {
LOG.info(String.format("Adding participant instance %s", instanceConfig));
admin.addInstance(CLUSTER_NAME, instanceConfig);
}
success = true;
} catch (HelixException | ZkNoNodeException ex) {
LOG.warn("Helix Cluster not yet setup fully.");
if (i < tries - 1) {
LOG.info(String.format("Waiting for %d seconds and retrying.", sleepTimeInSeconds));
TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
} else {
LOG.error(ex);
}
}
}
if (!success) {
LOG.info(String.format("Trying to create %s again since waiting for the creation did not help.", CLUSTER_NAME));
admin.addCluster(CLUSTER_NAME, true);
List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
if (!nodes.contains(instanceConfig.getInstanceName())) {
LOG.info(String.format("Adding participant instance %s", instanceConfig));
admin.addInstance(CLUSTER_NAME, instanceConfig);
}
}
// Add an ONLINE-OFFLINE state model
if (admin.getStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL) == null) {
LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
admin.addStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL, OnlineOfflineSMD.build());
}
// Add resources with 1 cluster-wide replica
// Since our aggregators are unbalanced in terms of work distribution we
// only need to distribute writes to METRIC_AGGREGATE and
// METRIC_RECORD_MINUTE, i.e. the Host level and Cluster level aggregations
List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
if (!resources.contains(METRIC_AGGREGATORS)) {
LOG.info(String.format("Adding resource %s with %d partitions and %d replicas", METRIC_AGGREGATORS, PARTITION_NUMBER, REPLICATION_FACTOR));
admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, PARTITION_NUMBER, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
}
// This will set up the ideal state, it calculates the preference list for each partition similar to consistent hashing.
admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, REPLICATION_FACTOR);
// Start participant
startAggregators();
// Start controller
startController();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
shutdownHAController();
}));
isInitialized = true;
}