private boolean initializeFromFilesystem()

in hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java [361:516]


  private boolean initializeFromFilesystem(String dataTableInstantTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException {
    Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
    if (!shouldInitializeFromFilesystem(pendingDataInstants, inflightInstantTimestamp)) {
      return false;
    }

    // FILES partition is always required and is initialized first
    boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionAvailable(FILES);
    if (!filesPartitionAvailable) {
      partitionsToInit.remove(FILES);
      partitionsToInit.add(0, FILES);
      // Initialize the metadata table for the first time
      metadataMetaClient = initializeMetaClient();
    } else {
      // Check and then open the metadata table reader so FILES partition can be read during initialization of other partitions
      initMetadataReader();
      // Load the metadata table metaclient if required
      if (metadataMetaClient == null) {
        metadataMetaClient = HoodieTableMetaClient.builder()
            .setConf(storageConf.newInstance()).setBasePath(metadataWriteConfig.getBasePath())
            .setTimeGeneratorConfig(dataWriteConfig.getTimeGeneratorConfig()).build();
      }
    }

    // Already initialized partitions can be ignored
    partitionsToInit.removeIf(metadataPartition -> dataMetaClient.getTableConfig().isMetadataPartitionAvailable((metadataPartition)));

    // Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT
    List<DirectoryInfo> partitionInfoList;
    if (filesPartitionAvailable) {
      partitionInfoList = listAllPartitionsFromMDT(dataTableInstantTime, pendingDataInstants);
    } else {
      // if auto initialization is enabled, then we need to list all partitions from the file system
      if (dataWriteConfig.getMetadataConfig().shouldAutoInitialize()) {
        partitionInfoList = listAllPartitionsFromFilesystem(dataTableInstantTime, pendingDataInstants);
      } else {
        // if auto initialization is disabled, we can return an empty list
        partitionInfoList = Collections.emptyList();
      }
    }
    Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
        .map(p -> {
          String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
          return Pair.of(partitionName, p.getFileNameToSizeMap());
        })
        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));

    // validate that each index is eligible to be initialized
    Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
    while (iterator.hasNext()) {
      MetadataPartitionType partitionType = iterator.next();
      if (partitionType == PARTITION_STATS && !dataMetaClient.getTableConfig().isTablePartitioned()) {
        LOG.warn("Partition stats index cannot be enabled for a non-partitioned table. Removing from initialization list. Please disable {}",
            HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key());
        iterator.remove();
        this.enabledPartitionTypes.remove(partitionType);
      }
    }

    for (MetadataPartitionType partitionType : partitionsToInit) {
      // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time.
      String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime);
      String partitionTypeName = partitionType.name();
      LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition);
      String partitionName;
      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
      List<String> columnsToIndex = new ArrayList<>();
      try {
        switch (partitionType) {
          case FILES:
            fileGroupCountAndRecordsPair = initializeFilesPartition(partitionInfoList);
            partitionName = FILES.getPartitionPath();
            break;
          case BLOOM_FILTERS:
            fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(dataTableInstantTime, partitionToFilesMap);
            partitionName = BLOOM_FILTERS.getPartitionPath();
            break;
          case COLUMN_STATS:
            Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = initializeColumnStatsPartition(partitionToFilesMap);
            columnsToIndex = colStatsColumnsAndRecord.getKey();
            fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
            partitionName = COLUMN_STATS.getPartitionPath();
            break;
          case RECORD_INDEX:
            fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
            partitionName = RECORD_INDEX.getPartitionPath();
            break;
          case EXPRESSION_INDEX:
            Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient);
            if (expressionIndexPartitionsToInit.size() != 1) {
              if (expressionIndexPartitionsToInit.size() > 1) {
                LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit);
              }
              continue;
            }
            partitionName = expressionIndexPartitionsToInit.iterator().next();
            fileGroupCountAndRecordsPair = initializeExpressionIndexPartition(partitionName, dataTableInstantTime);
            break;
          case PARTITION_STATS:
            // For PARTITION_STATS, COLUMN_STATS should also be enabled
            if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
              LOG.warn("Skipping partition stats initialization as column stats index is not enabled. Please enable {}",
                  HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
              continue;
            }
            fileGroupCountAndRecordsPair = initializePartitionStatsIndex();
            partitionName = PARTITION_STATS.getPartitionPath();
            break;
          case SECONDARY_INDEX:
            Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient);
            if (secondaryIndexPartitionsToInit.size() != 1) {
              if (secondaryIndexPartitionsToInit.size() > 1) {
                LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit);
              }
              continue;
            }
            partitionName = secondaryIndexPartitionsToInit.iterator().next();
            fileGroupCountAndRecordsPair = initializeSecondaryIndexPartition(partitionName);
            break;
          default:
            throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType));
        }
      } catch (Exception e) {
        String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
        metrics.ifPresent(m -> m.setMetric(metricKey, 1));
        String errMsg = String.format("Bootstrap on %s partition failed for %s",
            partitionType.getPartitionPath(), metadataMetaClient.getBasePath());
        LOG.error(errMsg, e);
        throw new HoodieMetadataException(errMsg, e);
      }

      if (LOG.isInfoEnabled()) {
        LOG.info("Initializing {} index with {} mappings", partitionTypeName, fileGroupCountAndRecordsPair.getKey());
      }
      HoodieTimer partitionInitTimer = HoodieTimer.start();

      // Generate the file groups
      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionTypeName + " should be > 0");
      initializeFileGroups(dataMetaClient, partitionType, instantTimeForPartition, fileGroupCount, partitionName);

      // Perform the commit using bulkCommit
      HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
      bulkCommit(instantTimeForPartition, partitionName, records, fileGroupCount);
      if (partitionType == COLUMN_STATS) {
        // initialize Col Stats index definition
        updateColumnsToIndexWithColStats(columnsToIndex);
      }
      dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionName, true);
      // initialize the metadata reader again so the MDT partition can be read after initialization
      initMetadataReader();
      long totalInitTime = partitionInitTimer.endTimer();
      LOG.info("Initializing {} index in metadata table took {} in ms", partitionTypeName, totalInitTime);
    }
    return true;
  }