public void initializeHAController()

in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java [121:192]


  public void initializeHAController() throws Exception {
    // Create setup tool instance
    admin = new ZKHelixAdmin(zkConnectUrl);
    // Create cluster namespace in zookeeper. Don't recreate if exists.
    LOG.info(String.format("Creating zookeeper cluster node: %s", CLUSTER_NAME));
    boolean clusterAdded = admin.addCluster(CLUSTER_NAME, false);
    LOG.info(String.format("Was cluster added successfully? %s", clusterAdded));

    // Adding host to the cluster
    boolean success = false;
    int tries = 5;
    int sleepTimeInSeconds = 5;

    for (int i = 0; i < tries && !success; i++) {
      try {
        List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
        if (!nodes.contains(instanceConfig.getInstanceName())) {
          LOG.info(String.format("Adding participant instance %s", instanceConfig));
          admin.addInstance(CLUSTER_NAME, instanceConfig);
        }
        success = true;
      } catch (HelixException | ZkNoNodeException ex) {
        LOG.warn("Helix Cluster not yet setup fully.");
        if (i < tries - 1) {
          LOG.info(String.format("Waiting for %d seconds and retrying.", sleepTimeInSeconds));
          TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
        } else {
          LOG.error(ex);
        }
      }
    }

    if (!success) {
      LOG.info(String.format("Trying to create %s again since waiting for the creation did not help.", CLUSTER_NAME));
      admin.addCluster(CLUSTER_NAME, true);
      List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
      if (!nodes.contains(instanceConfig.getInstanceName())) {
        LOG.info(String.format("Adding participant instance %s", instanceConfig));
        admin.addInstance(CLUSTER_NAME, instanceConfig);
      }
    }

    // Add an ONLINE-OFFLINE state model
    if (admin.getStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL) == null) {
      LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
      admin.addStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL, OnlineOfflineSMD.build());
    }

    // Add resources with 1 cluster-wide replica
    // Since our aggregators are unbalanced in terms of work distribution we
    // only need to distribute writes to METRIC_AGGREGATE and
    // METRIC_RECORD_MINUTE, i.e. the Host level and Cluster level aggregations
    List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
    if (!resources.contains(METRIC_AGGREGATORS)) {
      LOG.info(String.format("Adding resource %s with %d partitions and %d replicas", METRIC_AGGREGATORS, PARTITION_NUMBER, REPLICATION_FACTOR));
      admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, PARTITION_NUMBER, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
    }
    // This will set up the ideal state, it calculates the preference list for each partition similar to consistent hashing.
    admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, REPLICATION_FACTOR);

    // Start participant
    startAggregators();

    // Start controller
    startController();

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      shutdownHAController();
    }));

    isInitialized = true;
  }