in helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java [371:475]
public static synchronized void carryOverPreviousCurrentState(HelixDataAccessor dataAccessor,
String instanceName, String sessionId, StateMachineEngine stateMachineEngine,
boolean setToInitState) {
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
List<String> sessions = dataAccessor.getChildNames(keyBuilder.sessions(instanceName));
for (String session : sessions) {
if (session.equals(sessionId)) {
continue;
}
// Ignore if any current states in the previous folder cannot be read.
List<CurrentState> lastCurStates =
dataAccessor.getChildValues(keyBuilder.currentStates(instanceName, session), false);
for (CurrentState lastCurState : lastCurStates) {
LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
+ " to current session: " + sessionId + ", setToInitState: " + setToInitState);
String stateModelDefRef = lastCurState.getStateModelDefRef();
if (stateModelDefRef == null) {
LOG.error(
"skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
+ lastCurState);
continue;
}
// If the the current state is related to tasks, there is no need to carry it over to new session.
// Note: this check is not necessary due to TaskCurrentStates, but keep it for backwards compatibility
if (stateModelDefRef.equals(TaskConstants.STATE_MODEL_NAME)) {
continue;
}
StateModelDefinition stateModelDef =
dataAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
String initState = stateModelDef.getInitialState();
Map<String, String> partitionExpectedStateMap = new HashMap<>();
if (setToInitState) {
lastCurState.getPartitionStateMap().keySet()
.forEach(partition -> partitionExpectedStateMap.put(partition, initState));
} else {
String factoryName = lastCurState.getStateModelFactoryName();
StateModelFactory<? extends StateModel> stateModelFactory =
stateMachineEngine.getStateModelFactory(stateModelDefRef, factoryName);
lastCurState.getPartitionStateMap().keySet().forEach(partition -> {
StateModel stateModel =
stateModelFactory.getStateModel(lastCurState.getResourceName(), partition);
if (stateModel != null) {
partitionExpectedStateMap.put(partition, stateModel.getCurrentState());
}
});
}
BaseDataAccessor<ZNRecord> baseAccessor = dataAccessor.getBaseDataAccessor();
String curStatePath =
keyBuilder.currentState(instanceName, sessionId, lastCurState.getResourceName())
.getPath();
if (lastCurState.getBucketSize() > 0) {
// update parent node
ZNRecord metaRecord = new ZNRecord(lastCurState.getId());
metaRecord.setSimpleFields(lastCurState.getRecord().getSimpleFields());
DataUpdater<ZNRecord> metaRecordUpdater =
new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, new CurrentState(metaRecord));
boolean success =
baseAccessor.update(curStatePath, metaRecordUpdater, AccessOption.PERSISTENT);
if (success) {
// update current state buckets
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(lastCurState.getBucketSize());
Map<String, ZNRecord> map = bucketizer.bucketize(lastCurState.getRecord());
List<String> paths = new ArrayList<String>();
List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
for (String bucketName : map.keySet()) {
paths.add(curStatePath + "/" + bucketName);
updaters.add(new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, new CurrentState(map
.get(bucketName))));
}
baseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
}
} else {
dataAccessor.getBaseDataAccessor().update(curStatePath,
new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, lastCurState),
AccessOption.PERSISTENT);
}
}
}
/**
* remove previous current state parent nodes
*/
for (String session : sessions) {
if (session.equals(sessionId)) {
continue;
}
PropertyKey currentStatesProperty = keyBuilder.currentStates(instanceName, session);
String path = currentStatesProperty.getPath();
LOG.info("Removing current states from previous sessions. path: {}", path);
if (!dataAccessor.removeProperty(currentStatesProperty)) {
throw new ZkClientException("Failed to delete " + path);
}
}
}