private void handleEvent()

in helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java [790:986]


  private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProvider) {
    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
    if (manager == null) {
      logger.error("No cluster manager in event:" + event.getEventType());
      return;
    }

    // Event handling happens in a different thread from the onControllerChange processing thread.
    // Thus, there are several possible conditions.
    // 1. Event handled after leadership acquired. So we will have a valid rebalancer for the
    // event processing.
    // 2. Event handled shortly after leadership relinquished. And the rebalancer has not been
    // marked as invalid yet. So the event will be processed the same as case one.
    // 3. Event is leftover from the previous session, and it is handled when the controller
    // regains the leadership. The rebalancer will be reset before being used. That is the
    // expected behavior so as to avoid inconsistent rebalance result.
    // 4. Event handled shortly after leadership relinquished. And the rebalancer has been marked
    // as invalid. So we reset the rebalancer. But the later isLeader() check will return false and
    // the pipeline will be triggered. So the reset rebalancer won't be used before the controller
    // regains leadership.
    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
        _rebalancerRef.getRebalancer(manager));

    Optional<String> eventSessionId = Optional.empty();
    // We should expect only events in tests don't have it.
    // All events generated by controller in prod should have EVENT_SESSION.
    // This is to avoid tests failed or adding EVENT_SESSION to all ClusterEvent in tests.
    // TODO: may add EVENT_SESSION to tests' cluster events that need it
    if (!event.containsAttribute(AttributeName.EVENT_SESSION.name())) {
      logger.info("Event {} does not have event session attribute", event.getEventId());
    } else {
      eventSessionId = event.getAttribute(AttributeName.EVENT_SESSION.name());
      String managerSessionId = manager.getSessionId();

      // If manager session changes, no need to run pipeline for the stale event.
      if (!eventSessionId.isPresent() || !eventSessionId.get().equals(managerSessionId)) {
        logger.warn(
            "Controller pipeline is not invoked because event session doesn't match cluster " +
                "manager session. Event type: {}, id: {}, session: {}, actual manager session: "
                + "{}, instance: {}, cluster: {}", event.getEventType(), event.getEventId(),
            eventSessionId.orElse("NOT_PRESENT"), managerSessionId, manager.getInstanceName(),
            manager.getClusterName());
        return;
      }
    }

    _helixManager = manager;

    // Prepare ClusterEvent
    // TODO (harry): this is a temporal workaround - after controller is separated we should not
    // have this instanceof clauses
    List<Pipeline> pipelines;
    boolean isTaskFrameworkPipeline = false;
    boolean isManagementPipeline = false;

    if (dataProvider instanceof ResourceControllerDataProvider) {
      pipelines = _registry.getPipelinesForEvent(event.getEventType());
    } else if (dataProvider instanceof WorkflowControllerDataProvider) {
      pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType());
      isTaskFrameworkPipeline = true;
    } else if (dataProvider instanceof ManagementControllerDataProvider) {
      pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType());
      isManagementPipeline = true;
    } else {
      logger.warn(String
          .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
              event.getEventType(), event.getEventId()));
      return;
    }

    // Should not run management mode and default/task pipelines at the same time.
    if (_inManagementMode != isManagementPipeline) {
      logger.info("Should not run management mode and default/task pipelines at the same time. "
              + "cluster={}, inManagementMode={}, isManagementPipeline={}. Ignoring the event: {}",
          manager.getClusterName(), _inManagementMode, isManagementPipeline, event.getEventType());
      return;
    }

    NotificationContext context = null;
    if (event.getAttribute(AttributeName.changeContext.name()) != null) {
      context = event.getAttribute(AttributeName.changeContext.name());
    }

    if (context != null) {
      if (context.getType() == NotificationContext.Type.FINALIZE) {
        stopPeriodRebalance();
        logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getEventType());
        return;
      } else {
        // TODO: should be in the initialization of controller.
        if (_resourceControlDataProvider != null) {
          checkRebalancingTimer(manager, Collections.<IdealState>emptyList(), dataProvider.getClusterConfig());
        }
        if (_isMonitoring) {
          _clusterStatusMonitor.setEnabled(!_inManagementMode);
          _clusterStatusMonitor.setPaused(_inManagementMode);
          event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor);
        }
      }
    }

    dataProvider.setClusterEventId(event.getEventId());
    event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp);
    event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);

    logger.info("START: Invoking {} controller pipeline for cluster: {}. Event type: {}, ID: {}. "
            + "Event session ID: {}", dataProvider.getPipelineName(), manager.getClusterName(),
        event.getEventType(), event.getEventId(), eventSessionId.orElse("NOT_PRESENT"));

    long startTime = System.currentTimeMillis();
    boolean helixMetaDataAccessRebalanceFail = false;
    boolean rebalanceFail = false;
    for (Pipeline pipeline : pipelines) {
      event.addAttribute(AttributeName.PipelineType.name(), pipeline.getPipelineType());
      try {
        pipeline.handle(event);
        pipeline.finish();
      } catch (Exception e) {
        logger.error(
            "Exception while executing {} pipeline for cluster {}. Will not continue to next pipeline",
            dataProvider.getPipelineName(), _clusterName, e);
        if (e instanceof HelixMetaDataAccessException) {
          helixMetaDataAccessRebalanceFail = true;
          // If pipeline failed due to read/write fails to zookeeper, retry the pipeline.
          dataProvider.requireFullRefresh();
          logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + _clusterName);

          // only push a retry event when there is no pending event in the corresponding event queue.
          if (isEventQueueEmpty(isTaskFrameworkPipeline)) {
            _continuousRebalanceFailureCount ++;
            long delay = getRetryDelay(_continuousRebalanceFailureCount);
            if (delay == 0) {
              forceRebalance(manager, ClusterEventType.RetryRebalance);
            } else {
              _asyncTasksThreadPool
                  .schedule(new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay,
                      TimeUnit.MILLISECONDS);
            }
            logger.info("Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName);
          }
        }
        _clusterStatusMonitor.reportRebalanceFailure();
        updateContinuousRebalancedFailureCount(isTaskFrameworkPipeline, false /*resetToZero*/);
        rebalanceFail = true;
        break;
      }
    }
    if (!helixMetaDataAccessRebalanceFail) {
      _continuousRebalanceFailureCount = 0;
    }
    if (!rebalanceFail) {
      updateContinuousRebalancedFailureCount(isTaskFrameworkPipeline, true /*resetToZero*/);
    }

    _lastPipelineEndTimestamp = System.currentTimeMillis();
    logger.info("END: Invoking {} controller pipeline for event {}::{} for cluster {}, took {} ms",
        dataProvider.getPipelineName(), event.getEventType(), event.getEventId(), _clusterName,
        _lastPipelineEndTimestamp - startTime);

    if (!isTaskFrameworkPipeline) {
      // report event process durations
      NotificationContext notificationContext =
          event.getAttribute(AttributeName.changeContext.name());
      long enqueueTime = event.getCreationTime();
      long zkCallbackTime;
      StringBuilder sb = new StringBuilder();
      if (notificationContext != null) {
        zkCallbackTime = notificationContext.getCreationTime();
        if (_isMonitoring) {
          _clusterStatusMonitor
              .updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(),
                  enqueueTime - zkCallbackTime);
        }
        sb.append(String.format("Callback time for event: %s took: %s ms\n", event.getEventType(),
            enqueueTime - zkCallbackTime));
      }
      if (_isMonitoring) {
        _clusterStatusMonitor
            .updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(),
                startTime - enqueueTime);
        _clusterStatusMonitor
            .updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(),
                _lastPipelineEndTimestamp - startTime);
      }
      sb.append(String.format("InQueue time for event: %s took: %s ms\n", event.getEventType(),
          startTime - enqueueTime));
      sb.append(String.format("TotalProcessed time for event: %s took: %s ms", event.getEventType(),
          _lastPipelineEndTimestamp - startTime));
      logger.info(sb.toString());
    }

    // If event handling happens before controller deactivate, the process may write unnecessary
    // MBeans to monitoring after the monitor is disabled.
    // So reset ClusterStatusMonitor according to it's status after all event handling.
    // TODO remove this once clusterStatusMonitor blocks any MBean register on isMonitoring = false.
    resetClusterStatusMonitor();
  }