in helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java [790:986]
private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProvider) {
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
if (manager == null) {
logger.error("No cluster manager in event:" + event.getEventType());
return;
}
// Event handling happens in a different thread from the onControllerChange processing thread.
// Thus, there are several possible conditions.
// 1. Event handled after leadership acquired. So we will have a valid rebalancer for the
// event processing.
// 2. Event handled shortly after leadership relinquished. And the rebalancer has not been
// marked as invalid yet. So the event will be processed the same as case one.
// 3. Event is leftover from the previous session, and it is handled when the controller
// regains the leadership. The rebalancer will be reset before being used. That is the
// expected behavior so as to avoid inconsistent rebalance result.
// 4. Event handled shortly after leadership relinquished. And the rebalancer has been marked
// as invalid. So we reset the rebalancer. But the later isLeader() check will return false and
// the pipeline will be triggered. So the reset rebalancer won't be used before the controller
// regains leadership.
event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
_rebalancerRef.getRebalancer(manager));
Optional<String> eventSessionId = Optional.empty();
// We should expect only events in tests don't have it.
// All events generated by controller in prod should have EVENT_SESSION.
// This is to avoid tests failed or adding EVENT_SESSION to all ClusterEvent in tests.
// TODO: may add EVENT_SESSION to tests' cluster events that need it
if (!event.containsAttribute(AttributeName.EVENT_SESSION.name())) {
logger.info("Event {} does not have event session attribute", event.getEventId());
} else {
eventSessionId = event.getAttribute(AttributeName.EVENT_SESSION.name());
String managerSessionId = manager.getSessionId();
// If manager session changes, no need to run pipeline for the stale event.
if (!eventSessionId.isPresent() || !eventSessionId.get().equals(managerSessionId)) {
logger.warn(
"Controller pipeline is not invoked because event session doesn't match cluster " +
"manager session. Event type: {}, id: {}, session: {}, actual manager session: "
+ "{}, instance: {}, cluster: {}", event.getEventType(), event.getEventId(),
eventSessionId.orElse("NOT_PRESENT"), managerSessionId, manager.getInstanceName(),
manager.getClusterName());
return;
}
}
_helixManager = manager;
// Prepare ClusterEvent
// TODO (harry): this is a temporal workaround - after controller is separated we should not
// have this instanceof clauses
List<Pipeline> pipelines;
boolean isTaskFrameworkPipeline = false;
boolean isManagementPipeline = false;
if (dataProvider instanceof ResourceControllerDataProvider) {
pipelines = _registry.getPipelinesForEvent(event.getEventType());
} else if (dataProvider instanceof WorkflowControllerDataProvider) {
pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType());
isTaskFrameworkPipeline = true;
} else if (dataProvider instanceof ManagementControllerDataProvider) {
pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType());
isManagementPipeline = true;
} else {
logger.warn(String
.format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
event.getEventType(), event.getEventId()));
return;
}
// Should not run management mode and default/task pipelines at the same time.
if (_inManagementMode != isManagementPipeline) {
logger.info("Should not run management mode and default/task pipelines at the same time. "
+ "cluster={}, inManagementMode={}, isManagementPipeline={}. Ignoring the event: {}",
manager.getClusterName(), _inManagementMode, isManagementPipeline, event.getEventType());
return;
}
NotificationContext context = null;
if (event.getAttribute(AttributeName.changeContext.name()) != null) {
context = event.getAttribute(AttributeName.changeContext.name());
}
if (context != null) {
if (context.getType() == NotificationContext.Type.FINALIZE) {
stopPeriodRebalance();
logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getEventType());
return;
} else {
// TODO: should be in the initialization of controller.
if (_resourceControlDataProvider != null) {
checkRebalancingTimer(manager, Collections.<IdealState>emptyList(), dataProvider.getClusterConfig());
}
if (_isMonitoring) {
_clusterStatusMonitor.setEnabled(!_inManagementMode);
_clusterStatusMonitor.setPaused(_inManagementMode);
event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor);
}
}
}
dataProvider.setClusterEventId(event.getEventId());
event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp);
event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);
logger.info("START: Invoking {} controller pipeline for cluster: {}. Event type: {}, ID: {}. "
+ "Event session ID: {}", dataProvider.getPipelineName(), manager.getClusterName(),
event.getEventType(), event.getEventId(), eventSessionId.orElse("NOT_PRESENT"));
long startTime = System.currentTimeMillis();
boolean helixMetaDataAccessRebalanceFail = false;
boolean rebalanceFail = false;
for (Pipeline pipeline : pipelines) {
event.addAttribute(AttributeName.PipelineType.name(), pipeline.getPipelineType());
try {
pipeline.handle(event);
pipeline.finish();
} catch (Exception e) {
logger.error(
"Exception while executing {} pipeline for cluster {}. Will not continue to next pipeline",
dataProvider.getPipelineName(), _clusterName, e);
if (e instanceof HelixMetaDataAccessException) {
helixMetaDataAccessRebalanceFail = true;
// If pipeline failed due to read/write fails to zookeeper, retry the pipeline.
dataProvider.requireFullRefresh();
logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + _clusterName);
// only push a retry event when there is no pending event in the corresponding event queue.
if (isEventQueueEmpty(isTaskFrameworkPipeline)) {
_continuousRebalanceFailureCount ++;
long delay = getRetryDelay(_continuousRebalanceFailureCount);
if (delay == 0) {
forceRebalance(manager, ClusterEventType.RetryRebalance);
} else {
_asyncTasksThreadPool
.schedule(new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay,
TimeUnit.MILLISECONDS);
}
logger.info("Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName);
}
}
_clusterStatusMonitor.reportRebalanceFailure();
updateContinuousRebalancedFailureCount(isTaskFrameworkPipeline, false /*resetToZero*/);
rebalanceFail = true;
break;
}
}
if (!helixMetaDataAccessRebalanceFail) {
_continuousRebalanceFailureCount = 0;
}
if (!rebalanceFail) {
updateContinuousRebalancedFailureCount(isTaskFrameworkPipeline, true /*resetToZero*/);
}
_lastPipelineEndTimestamp = System.currentTimeMillis();
logger.info("END: Invoking {} controller pipeline for event {}::{} for cluster {}, took {} ms",
dataProvider.getPipelineName(), event.getEventType(), event.getEventId(), _clusterName,
_lastPipelineEndTimestamp - startTime);
if (!isTaskFrameworkPipeline) {
// report event process durations
NotificationContext notificationContext =
event.getAttribute(AttributeName.changeContext.name());
long enqueueTime = event.getCreationTime();
long zkCallbackTime;
StringBuilder sb = new StringBuilder();
if (notificationContext != null) {
zkCallbackTime = notificationContext.getCreationTime();
if (_isMonitoring) {
_clusterStatusMonitor
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(),
enqueueTime - zkCallbackTime);
}
sb.append(String.format("Callback time for event: %s took: %s ms\n", event.getEventType(),
enqueueTime - zkCallbackTime));
}
if (_isMonitoring) {
_clusterStatusMonitor
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(),
startTime - enqueueTime);
_clusterStatusMonitor
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(),
_lastPipelineEndTimestamp - startTime);
}
sb.append(String.format("InQueue time for event: %s took: %s ms\n", event.getEventType(),
startTime - enqueueTime));
sb.append(String.format("TotalProcessed time for event: %s took: %s ms", event.getEventType(),
_lastPipelineEndTimestamp - startTime));
logger.info(sb.toString());
}
// If event handling happens before controller deactivate, the process may write unnecessary
// MBeans to monitoring after the monitor is disabled.
// So reset ClusterStatusMonitor according to it's status after all event handling.
// TODO remove this once clusterStatusMonitor blocks any MBean register on isMonitoring = false.
resetClusterStatusMonitor();
}