in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java [1265:1474]
public void maintainRegionReplicas() {
// The consensusManager of configManager may not be fully initialized at this time
Optional.ofNullable(getConsensusManager())
.ifPresent(
consensusManager -> {
if (getConsensusManager().isLeader()) {
List<RegionMaintainTask> regionMaintainTaskList =
partitionInfo.getRegionMaintainEntryList();
if (regionMaintainTaskList.isEmpty()) {
return;
}
// Group tasks by region id
Map<TConsensusGroupId, Queue<RegionMaintainTask>> regionMaintainTaskMap =
new HashMap<>();
for (RegionMaintainTask regionMaintainTask : regionMaintainTaskList) {
regionMaintainTaskMap
.computeIfAbsent(regionMaintainTask.getRegionId(), k -> new LinkedList<>())
.add(regionMaintainTask);
}
while (!regionMaintainTaskMap.isEmpty()) {
// Select same type task from each region group
List<RegionMaintainTask> selectedRegionMaintainTask = new ArrayList<>();
RegionMaintainType currentType = null;
for (Map.Entry<TConsensusGroupId, Queue<RegionMaintainTask>> entry :
regionMaintainTaskMap.entrySet()) {
RegionMaintainTask regionMaintainTask = entry.getValue().peek();
if (regionMaintainTask == null) {
continue;
}
if (currentType == null) {
currentType = regionMaintainTask.getType();
selectedRegionMaintainTask.add(entry.getValue().peek());
} else {
if (!currentType.equals(regionMaintainTask.getType())) {
continue;
}
if (currentType.equals(RegionMaintainType.DELETE)
|| entry
.getKey()
.getType()
.equals(selectedRegionMaintainTask.get(0).getRegionId().getType())) {
// Delete or same create task
selectedRegionMaintainTask.add(entry.getValue().peek());
}
}
}
if (selectedRegionMaintainTask.isEmpty()) {
break;
}
Set<TConsensusGroupId> successfulTask = new HashSet<>();
switch (currentType) {
case CREATE:
// create region
switch (selectedRegionMaintainTask.get(0).getRegionId().getType()) {
case SchemaRegion:
// create SchemaRegion
DataNodeAsyncRequestContext<TCreateSchemaRegionReq, TSStatus>
createSchemaRegionHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.CREATE_SCHEMA_REGION);
for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) {
RegionCreateTask schemaRegionCreateTask =
(RegionCreateTask) regionMaintainTask;
LOGGER.info(
"Start to create Region: {} on DataNode: {}",
schemaRegionCreateTask.getRegionReplicaSet().getRegionId(),
schemaRegionCreateTask.getTargetDataNode());
createSchemaRegionHandler.putRequest(
schemaRegionCreateTask.getRegionId().getId(),
new TCreateSchemaRegionReq(
schemaRegionCreateTask.getRegionReplicaSet(),
schemaRegionCreateTask.getStorageGroup()));
createSchemaRegionHandler.putNodeLocation(
schemaRegionCreateTask.getRegionId().getId(),
schemaRegionCreateTask.getTargetDataNode());
}
CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestWithRetry(createSchemaRegionHandler);
for (Map.Entry<Integer, TSStatus> entry :
createSchemaRegionHandler.getResponseMap().entrySet()) {
if (entry.getValue().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successfulTask.add(
new TConsensusGroupId(
TConsensusGroupType.SchemaRegion, entry.getKey()));
}
}
break;
case DataRegion:
// Create DataRegion
DataNodeAsyncRequestContext<TCreateDataRegionReq, TSStatus>
createDataRegionHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.CREATE_DATA_REGION);
for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) {
RegionCreateTask dataRegionCreateTask =
(RegionCreateTask) regionMaintainTask;
LOGGER.info(
"Start to create Region: {} on DataNode: {}",
dataRegionCreateTask.getRegionReplicaSet().getRegionId(),
dataRegionCreateTask.getTargetDataNode());
createDataRegionHandler.putRequest(
dataRegionCreateTask.getRegionId().getId(),
new TCreateDataRegionReq(
dataRegionCreateTask.getRegionReplicaSet(),
dataRegionCreateTask.getStorageGroup()));
createDataRegionHandler.putNodeLocation(
dataRegionCreateTask.getRegionId().getId(),
dataRegionCreateTask.getTargetDataNode());
}
CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestWithRetry(createDataRegionHandler);
for (Map.Entry<Integer, TSStatus> entry :
createDataRegionHandler.getResponseMap().entrySet()) {
if (entry.getValue().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successfulTask.add(
new TConsensusGroupId(
TConsensusGroupType.DataRegion, entry.getKey()));
}
}
break;
}
break;
case DELETE:
// delete region
DataNodeAsyncRequestContext<TConsensusGroupId, TSStatus> deleteRegionHandler =
new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.DELETE_REGION);
Map<Integer, TConsensusGroupId> regionIdMap = new HashMap<>();
for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) {
RegionDeleteTask regionDeleteTask = (RegionDeleteTask) regionMaintainTask;
LOGGER.info(
"Start to delete Region: {} on DataNode: {}",
regionDeleteTask.getRegionId(),
regionDeleteTask.getTargetDataNode());
deleteRegionHandler.putRequest(
regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId());
deleteRegionHandler.putNodeLocation(
regionDeleteTask.getRegionId().getId(),
regionDeleteTask.getTargetDataNode());
regionIdMap.put(
regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId());
}
long startTime = System.currentTimeMillis();
CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestWithRetry(deleteRegionHandler);
LOGGER.info(
"Deleting regions costs {}ms", (System.currentTimeMillis() - startTime));
for (Map.Entry<Integer, TSStatus> entry :
deleteRegionHandler.getResponseMap().entrySet()) {
if (entry.getValue().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successfulTask.add(regionIdMap.get(entry.getKey()));
}
}
break;
}
if (successfulTask.isEmpty()) {
break;
}
for (TConsensusGroupId regionId : successfulTask) {
regionMaintainTaskMap.compute(
regionId,
(k, v) -> {
if (v == null) {
throw new IllegalStateException();
}
v.poll();
if (v.isEmpty()) {
return null;
} else {
return v;
}
});
}
// Poll the head entry if success
try {
getConsensusManager()
.write(new PollSpecificRegionMaintainTaskPlan(successfulTask));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
}
if (successfulTask.size() < selectedRegionMaintainTask.size()) {
// Here we just break and wait until next schedule task
// due to all the RegionMaintainEntry should be executed by
// the order of they were offered
break;
}
}
}
});
}