public ZKHelixManager()

in helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java [236:355]


  public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
      String zkAddress, HelixManagerStateListener stateListener,
      HelixManagerProperty helixManagerProperty) {
    validateZkConnectionSettings(zkAddress, helixManagerProperty);

    _zkAddress = zkAddress;
    _clusterName = clusterName;
    _instanceType = instanceType;
    LOG.info("Create a zk-based cluster manager. ZK connection: " + getZkConnectionInfo()
        + ", clusterName: " + clusterName + ", instanceName: " + instanceName + ", type: "
        + instanceType);

    if (instanceName == null) {
      try {
        instanceName =
            InetAddress.getLocalHost().getCanonicalHostName() + "-" + instanceType.toString();
      } catch (UnknownHostException e) {
        // can ignore it
        LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
        instanceName = "UNKNOWN";
      }
    }

    _instanceName = instanceName;
    _enabledPipelineTypes =
        Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK);
    _preConnectCallbacks = new ArrayList<>();
    _handlers = new ArrayList<>();
    _properties = new HelixManagerProperties(SystemPropertyKeys.CLUSTER_MANAGER_VERSION);
    _version = _properties.getVersion();

    _keyBuilder = new Builder(clusterName);
    _messagingService = new DefaultMessagingService(this);
    try {
      _callbackMonitors = new HashMap<>();
      for (ChangeType changeType : ChangeType.values()) {
        HelixCallbackMonitor callbackMonitor =
            new HelixCallbackMonitor(instanceType, clusterName, instanceName, changeType);
        callbackMonitor.register();
        _callbackMonitors.put(changeType, callbackMonitor);
      }
    } catch (JMException e) {
      LOG.error("Error in creating callback monitor.", e);
    }

    _stateListener = stateListener;
    // read cloud config from ZK and set cloudConfig in HelixManagerProperty
    _helixManagerProperty = helixManagerProperty;
    _helixManagerProperty.getHelixCloudProperty().populateFieldsWithCloudConfig(
        HelixPropertyFactory.getCloudConfig(_zkAddress, _clusterName,
            helixManagerProperty.getZkConnectionConfig()));

    /**
     * use system property if available
     */
    _flappingTimeWindowMs = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
        ZKHelixManager.FLAPPING_TIME_WINDOW);

    _maxDisconnectThreshold = HelixUtil
        .getSystemPropertyAsInt(SystemPropertyKeys.MAX_DISCONNECT_THRESHOLD,
            ZKHelixManager.DEFAULT_MAX_DISCONNECT_THRESHOLD);

    _sessionTimeout = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.ZK_SESSION_TIMEOUT,
        HelixZkClient.DEFAULT_SESSION_TIMEOUT);

    _connectionInitTimeout = HelixUtil
        .getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT,
            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT);

    _waitForConnectedTimeout = HelixUtil
        .getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT,
            DEFAULT_WAIT_CONNECTED_TIMEOUT);

    _reportLatency = HelixUtil
        .getSystemPropertyAsInt(SystemPropertyKeys.PARTICIPANT_HEALTH_REPORT_LATENCY,
            ParticipantHealthReportTask.DEFAULT_REPORT_LATENCY);

    MonitorLevel configuredMonitorLevel;
    try {
      configuredMonitorLevel = MonitorLevel.valueOf(
          System.getProperty(SystemPropertyKeys.MONITOR_LEVEL, MonitorLevel.DEFAULT.name()));
    } catch (IllegalArgumentException ex) {
      LOG.warn("Unrecognizable Monitor Level configuration. Use DEFAULT monitor level.", ex);
      configuredMonitorLevel = MonitorLevel.DEFAULT;
    }
    _monitorLevel = configuredMonitorLevel;

    /**
     * instance type specific init
     */
    switch (instanceType) {
    case PARTICIPANT:
      _stateMachineEngine = new HelixStateMachineEngine(this);
      _participantHealthInfoCollector =
          new ParticipantHealthReportCollectorImpl(this, _instanceName);
      _timerTasks
          .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
      break;
    case CONTROLLER:
      _stateMachineEngine = null;
      _participantHealthInfoCollector = null;
      _controllerTimerTasks.add(new StatusDumpTask(this));
      break;
    case CONTROLLER_PARTICIPANT:
      _stateMachineEngine = new HelixStateMachineEngine(this);
      _participantHealthInfoCollector =
          new ParticipantHealthReportCollectorImpl(this, _instanceName);
      _timerTasks
          .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
      _controllerTimerTasks.add(new StatusDumpTask(this));
      break;
    case ADMINISTRATOR:
    case SPECTATOR:
      _stateMachineEngine = null;
      _participantHealthInfoCollector = null;
      break;
    default:
      throw new IllegalArgumentException("unrecognized type: " + instanceType);
    }
  }