public static synchronized void carryOverPreviousCurrentState()

in helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java [371:475]


  public static synchronized void carryOverPreviousCurrentState(HelixDataAccessor dataAccessor,
      String instanceName, String sessionId, StateMachineEngine stateMachineEngine,
      boolean setToInitState) {
    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
    List<String> sessions = dataAccessor.getChildNames(keyBuilder.sessions(instanceName));

    for (String session : sessions) {
      if (session.equals(sessionId)) {
        continue;
      }

      // Ignore if any current states in the previous folder cannot be read.
      List<CurrentState> lastCurStates =
          dataAccessor.getChildValues(keyBuilder.currentStates(instanceName, session), false);

      for (CurrentState lastCurState : lastCurStates) {
        LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
            + " to current session: " + sessionId + ", setToInitState: " + setToInitState);
        String stateModelDefRef = lastCurState.getStateModelDefRef();
        if (stateModelDefRef == null) {
          LOG.error(
              "skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
                  + lastCurState);
          continue;
        }

        // If the the current state is related to tasks, there is no need to carry it over to new session.
        // Note: this check is not necessary due to TaskCurrentStates, but keep it for backwards compatibility
        if (stateModelDefRef.equals(TaskConstants.STATE_MODEL_NAME)) {
          continue;
        }

        StateModelDefinition stateModelDef =
            dataAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
        String initState = stateModelDef.getInitialState();
        Map<String, String> partitionExpectedStateMap = new HashMap<>();
        if (setToInitState) {
          lastCurState.getPartitionStateMap().keySet()
              .forEach(partition -> partitionExpectedStateMap.put(partition, initState));
        } else {
          String factoryName = lastCurState.getStateModelFactoryName();
          StateModelFactory<? extends StateModel> stateModelFactory =
              stateMachineEngine.getStateModelFactory(stateModelDefRef, factoryName);
          lastCurState.getPartitionStateMap().keySet().forEach(partition -> {
            StateModel stateModel =
                stateModelFactory.getStateModel(lastCurState.getResourceName(), partition);
            if (stateModel != null) {
              partitionExpectedStateMap.put(partition, stateModel.getCurrentState());
            }
          });
        }

        BaseDataAccessor<ZNRecord> baseAccessor = dataAccessor.getBaseDataAccessor();
        String curStatePath =
            keyBuilder.currentState(instanceName, sessionId, lastCurState.getResourceName())
                .getPath();

        if (lastCurState.getBucketSize() > 0) {
          // update parent node
          ZNRecord metaRecord = new ZNRecord(lastCurState.getId());
          metaRecord.setSimpleFields(lastCurState.getRecord().getSimpleFields());
          DataUpdater<ZNRecord> metaRecordUpdater =
              new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, new CurrentState(metaRecord));
          boolean success =
              baseAccessor.update(curStatePath, metaRecordUpdater, AccessOption.PERSISTENT);
          if (success) {
            // update current state buckets
            ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(lastCurState.getBucketSize());

            Map<String, ZNRecord> map = bucketizer.bucketize(lastCurState.getRecord());
            List<String> paths = new ArrayList<String>();
            List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
            for (String bucketName : map.keySet()) {
              paths.add(curStatePath + "/" + bucketName);
              updaters.add(new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, new CurrentState(map
                  .get(bucketName))));
            }

            baseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
          }

        } else {
          dataAccessor.getBaseDataAccessor().update(curStatePath,
              new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, lastCurState),
              AccessOption.PERSISTENT);
        }
      }
    }

    /**
     * remove previous current state parent nodes
     */
    for (String session : sessions) {
      if (session.equals(sessionId)) {
        continue;
      }

      PropertyKey currentStatesProperty = keyBuilder.currentStates(instanceName, session);
      String path = currentStatesProperty.getPath();
      LOG.info("Removing current states from previous sessions. path: {}", path);
      if (!dataAccessor.removeProperty(currentStatesProperty)) {
        throw new ZkClientException("Failed to delete " + path);
      }
    }
  }