in helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java [236:355]
public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
String zkAddress, HelixManagerStateListener stateListener,
HelixManagerProperty helixManagerProperty) {
validateZkConnectionSettings(zkAddress, helixManagerProperty);
_zkAddress = zkAddress;
_clusterName = clusterName;
_instanceType = instanceType;
LOG.info("Create a zk-based cluster manager. ZK connection: " + getZkConnectionInfo()
+ ", clusterName: " + clusterName + ", instanceName: " + instanceName + ", type: "
+ instanceType);
if (instanceName == null) {
try {
instanceName =
InetAddress.getLocalHost().getCanonicalHostName() + "-" + instanceType.toString();
} catch (UnknownHostException e) {
// can ignore it
LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
instanceName = "UNKNOWN";
}
}
_instanceName = instanceName;
_enabledPipelineTypes =
Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK);
_preConnectCallbacks = new ArrayList<>();
_handlers = new ArrayList<>();
_properties = new HelixManagerProperties(SystemPropertyKeys.CLUSTER_MANAGER_VERSION);
_version = _properties.getVersion();
_keyBuilder = new Builder(clusterName);
_messagingService = new DefaultMessagingService(this);
try {
_callbackMonitors = new HashMap<>();
for (ChangeType changeType : ChangeType.values()) {
HelixCallbackMonitor callbackMonitor =
new HelixCallbackMonitor(instanceType, clusterName, instanceName, changeType);
callbackMonitor.register();
_callbackMonitors.put(changeType, callbackMonitor);
}
} catch (JMException e) {
LOG.error("Error in creating callback monitor.", e);
}
_stateListener = stateListener;
// read cloud config from ZK and set cloudConfig in HelixManagerProperty
_helixManagerProperty = helixManagerProperty;
_helixManagerProperty.getHelixCloudProperty().populateFieldsWithCloudConfig(
HelixPropertyFactory.getCloudConfig(_zkAddress, _clusterName,
helixManagerProperty.getZkConnectionConfig()));
/**
* use system property if available
*/
_flappingTimeWindowMs = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
ZKHelixManager.FLAPPING_TIME_WINDOW);
_maxDisconnectThreshold = HelixUtil
.getSystemPropertyAsInt(SystemPropertyKeys.MAX_DISCONNECT_THRESHOLD,
ZKHelixManager.DEFAULT_MAX_DISCONNECT_THRESHOLD);
_sessionTimeout = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.ZK_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_SESSION_TIMEOUT);
_connectionInitTimeout = HelixUtil
.getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT);
_waitForConnectedTimeout = HelixUtil
.getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT,
DEFAULT_WAIT_CONNECTED_TIMEOUT);
_reportLatency = HelixUtil
.getSystemPropertyAsInt(SystemPropertyKeys.PARTICIPANT_HEALTH_REPORT_LATENCY,
ParticipantHealthReportTask.DEFAULT_REPORT_LATENCY);
MonitorLevel configuredMonitorLevel;
try {
configuredMonitorLevel = MonitorLevel.valueOf(
System.getProperty(SystemPropertyKeys.MONITOR_LEVEL, MonitorLevel.DEFAULT.name()));
} catch (IllegalArgumentException ex) {
LOG.warn("Unrecognizable Monitor Level configuration. Use DEFAULT monitor level.", ex);
configuredMonitorLevel = MonitorLevel.DEFAULT;
}
_monitorLevel = configuredMonitorLevel;
/**
* instance type specific init
*/
switch (instanceType) {
case PARTICIPANT:
_stateMachineEngine = new HelixStateMachineEngine(this);
_participantHealthInfoCollector =
new ParticipantHealthReportCollectorImpl(this, _instanceName);
_timerTasks
.add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
break;
case CONTROLLER:
_stateMachineEngine = null;
_participantHealthInfoCollector = null;
_controllerTimerTasks.add(new StatusDumpTask(this));
break;
case CONTROLLER_PARTICIPANT:
_stateMachineEngine = new HelixStateMachineEngine(this);
_participantHealthInfoCollector =
new ParticipantHealthReportCollectorImpl(this, _instanceName);
_timerTasks
.add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
_controllerTimerTasks.add(new StatusDumpTask(this));
break;
case ADMINISTRATOR:
case SPECTATOR:
_stateMachineEngine = null;
_participantHealthInfoCollector = null;
break;
default:
throw new IllegalArgumentException("unrecognized type: " + instanceType);
}
}