public void maintainRegionReplicas()

in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java [1265:1474]


  public void maintainRegionReplicas() {
    // The consensusManager of configManager may not be fully initialized at this time
    Optional.ofNullable(getConsensusManager())
        .ifPresent(
            consensusManager -> {
              if (getConsensusManager().isLeader()) {
                List<RegionMaintainTask> regionMaintainTaskList =
                    partitionInfo.getRegionMaintainEntryList();

                if (regionMaintainTaskList.isEmpty()) {
                  return;
                }

                // Group tasks by region id
                Map<TConsensusGroupId, Queue<RegionMaintainTask>> regionMaintainTaskMap =
                    new HashMap<>();
                for (RegionMaintainTask regionMaintainTask : regionMaintainTaskList) {
                  regionMaintainTaskMap
                      .computeIfAbsent(regionMaintainTask.getRegionId(), k -> new LinkedList<>())
                      .add(regionMaintainTask);
                }

                while (!regionMaintainTaskMap.isEmpty()) {
                  // Select same type task from each region group
                  List<RegionMaintainTask> selectedRegionMaintainTask = new ArrayList<>();
                  RegionMaintainType currentType = null;
                  for (Map.Entry<TConsensusGroupId, Queue<RegionMaintainTask>> entry :
                      regionMaintainTaskMap.entrySet()) {
                    RegionMaintainTask regionMaintainTask = entry.getValue().peek();
                    if (regionMaintainTask == null) {
                      continue;
                    }

                    if (currentType == null) {
                      currentType = regionMaintainTask.getType();
                      selectedRegionMaintainTask.add(entry.getValue().peek());
                    } else {
                      if (!currentType.equals(regionMaintainTask.getType())) {
                        continue;
                      }

                      if (currentType.equals(RegionMaintainType.DELETE)
                          || entry
                              .getKey()
                              .getType()
                              .equals(selectedRegionMaintainTask.get(0).getRegionId().getType())) {
                        // Delete or same create task
                        selectedRegionMaintainTask.add(entry.getValue().peek());
                      }
                    }
                  }

                  if (selectedRegionMaintainTask.isEmpty()) {
                    break;
                  }

                  Set<TConsensusGroupId> successfulTask = new HashSet<>();
                  switch (currentType) {
                    case CREATE:
                      // create region
                      switch (selectedRegionMaintainTask.get(0).getRegionId().getType()) {
                        case SchemaRegion:
                          // create SchemaRegion
                          DataNodeAsyncRequestContext<TCreateSchemaRegionReq, TSStatus>
                              createSchemaRegionHandler =
                                  new DataNodeAsyncRequestContext<>(
                                      CnToDnAsyncRequestType.CREATE_SCHEMA_REGION);
                          for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) {
                            RegionCreateTask schemaRegionCreateTask =
                                (RegionCreateTask) regionMaintainTask;
                            LOGGER.info(
                                "Start to create Region: {} on DataNode: {}",
                                schemaRegionCreateTask.getRegionReplicaSet().getRegionId(),
                                schemaRegionCreateTask.getTargetDataNode());
                            createSchemaRegionHandler.putRequest(
                                schemaRegionCreateTask.getRegionId().getId(),
                                new TCreateSchemaRegionReq(
                                    schemaRegionCreateTask.getRegionReplicaSet(),
                                    schemaRegionCreateTask.getStorageGroup()));
                            createSchemaRegionHandler.putNodeLocation(
                                schemaRegionCreateTask.getRegionId().getId(),
                                schemaRegionCreateTask.getTargetDataNode());
                          }

                          CnToDnInternalServiceAsyncRequestManager.getInstance()
                              .sendAsyncRequestWithRetry(createSchemaRegionHandler);

                          for (Map.Entry<Integer, TSStatus> entry :
                              createSchemaRegionHandler.getResponseMap().entrySet()) {
                            if (entry.getValue().getCode()
                                == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                              successfulTask.add(
                                  new TConsensusGroupId(
                                      TConsensusGroupType.SchemaRegion, entry.getKey()));
                            }
                          }
                          break;
                        case DataRegion:
                          // Create DataRegion
                          DataNodeAsyncRequestContext<TCreateDataRegionReq, TSStatus>
                              createDataRegionHandler =
                                  new DataNodeAsyncRequestContext<>(
                                      CnToDnAsyncRequestType.CREATE_DATA_REGION);
                          for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) {
                            RegionCreateTask dataRegionCreateTask =
                                (RegionCreateTask) regionMaintainTask;
                            LOGGER.info(
                                "Start to create Region: {} on DataNode: {}",
                                dataRegionCreateTask.getRegionReplicaSet().getRegionId(),
                                dataRegionCreateTask.getTargetDataNode());
                            createDataRegionHandler.putRequest(
                                dataRegionCreateTask.getRegionId().getId(),
                                new TCreateDataRegionReq(
                                    dataRegionCreateTask.getRegionReplicaSet(),
                                    dataRegionCreateTask.getStorageGroup()));
                            createDataRegionHandler.putNodeLocation(
                                dataRegionCreateTask.getRegionId().getId(),
                                dataRegionCreateTask.getTargetDataNode());
                          }

                          CnToDnInternalServiceAsyncRequestManager.getInstance()
                              .sendAsyncRequestWithRetry(createDataRegionHandler);

                          for (Map.Entry<Integer, TSStatus> entry :
                              createDataRegionHandler.getResponseMap().entrySet()) {
                            if (entry.getValue().getCode()
                                == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                              successfulTask.add(
                                  new TConsensusGroupId(
                                      TConsensusGroupType.DataRegion, entry.getKey()));
                            }
                          }
                          break;
                      }
                      break;
                    case DELETE:
                      // delete region
                      DataNodeAsyncRequestContext<TConsensusGroupId, TSStatus> deleteRegionHandler =
                          new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.DELETE_REGION);
                      Map<Integer, TConsensusGroupId> regionIdMap = new HashMap<>();
                      for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) {
                        RegionDeleteTask regionDeleteTask = (RegionDeleteTask) regionMaintainTask;
                        LOGGER.info(
                            "Start to delete Region: {} on DataNode: {}",
                            regionDeleteTask.getRegionId(),
                            regionDeleteTask.getTargetDataNode());
                        deleteRegionHandler.putRequest(
                            regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId());
                        deleteRegionHandler.putNodeLocation(
                            regionDeleteTask.getRegionId().getId(),
                            regionDeleteTask.getTargetDataNode());
                        regionIdMap.put(
                            regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId());
                      }

                      long startTime = System.currentTimeMillis();
                      CnToDnInternalServiceAsyncRequestManager.getInstance()
                          .sendAsyncRequestWithRetry(deleteRegionHandler);

                      LOGGER.info(
                          "Deleting regions costs {}ms", (System.currentTimeMillis() - startTime));

                      for (Map.Entry<Integer, TSStatus> entry :
                          deleteRegionHandler.getResponseMap().entrySet()) {
                        if (entry.getValue().getCode()
                            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                          successfulTask.add(regionIdMap.get(entry.getKey()));
                        }
                      }
                      break;
                  }

                  if (successfulTask.isEmpty()) {
                    break;
                  }

                  for (TConsensusGroupId regionId : successfulTask) {
                    regionMaintainTaskMap.compute(
                        regionId,
                        (k, v) -> {
                          if (v == null) {
                            throw new IllegalStateException();
                          }
                          v.poll();
                          if (v.isEmpty()) {
                            return null;
                          } else {
                            return v;
                          }
                        });
                  }

                  // Poll the head entry if success
                  try {
                    getConsensusManager()
                        .write(new PollSpecificRegionMaintainTaskPlan(successfulTask));
                  } catch (ConsensusException e) {
                    LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
                  }

                  if (successfulTask.size() < selectedRegionMaintainTask.size()) {
                    // Here we just break and wait until next schedule task
                    // due to all the RegionMaintainEntry should be executed by
                    // the order of they were offered
                    break;
                  }
                }
              }
            });
  }