in hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java [361:516]
private boolean initializeFromFilesystem(String dataTableInstantTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException {
Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
if (!shouldInitializeFromFilesystem(pendingDataInstants, inflightInstantTimestamp)) {
return false;
}
// FILES partition is always required and is initialized first
boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionAvailable(FILES);
if (!filesPartitionAvailable) {
partitionsToInit.remove(FILES);
partitionsToInit.add(0, FILES);
// Initialize the metadata table for the first time
metadataMetaClient = initializeMetaClient();
} else {
// Check and then open the metadata table reader so FILES partition can be read during initialization of other partitions
initMetadataReader();
// Load the metadata table metaclient if required
if (metadataMetaClient == null) {
metadataMetaClient = HoodieTableMetaClient.builder()
.setConf(storageConf.newInstance()).setBasePath(metadataWriteConfig.getBasePath())
.setTimeGeneratorConfig(dataWriteConfig.getTimeGeneratorConfig()).build();
}
}
// Already initialized partitions can be ignored
partitionsToInit.removeIf(metadataPartition -> dataMetaClient.getTableConfig().isMetadataPartitionAvailable((metadataPartition)));
// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT
List<DirectoryInfo> partitionInfoList;
if (filesPartitionAvailable) {
partitionInfoList = listAllPartitionsFromMDT(dataTableInstantTime, pendingDataInstants);
} else {
// if auto initialization is enabled, then we need to list all partitions from the file system
if (dataWriteConfig.getMetadataConfig().shouldAutoInitialize()) {
partitionInfoList = listAllPartitionsFromFilesystem(dataTableInstantTime, pendingDataInstants);
} else {
// if auto initialization is disabled, we can return an empty list
partitionInfoList = Collections.emptyList();
}
}
Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
.map(p -> {
String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
return Pair.of(partitionName, p.getFileNameToSizeMap());
})
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// validate that each index is eligible to be initialized
Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
while (iterator.hasNext()) {
MetadataPartitionType partitionType = iterator.next();
if (partitionType == PARTITION_STATS && !dataMetaClient.getTableConfig().isTablePartitioned()) {
LOG.warn("Partition stats index cannot be enabled for a non-partitioned table. Removing from initialization list. Please disable {}",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key());
iterator.remove();
this.enabledPartitionTypes.remove(partitionType);
}
}
for (MetadataPartitionType partitionType : partitionsToInit) {
// Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time.
String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime);
String partitionTypeName = partitionType.name();
LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition);
String partitionName;
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
List<String> columnsToIndex = new ArrayList<>();
try {
switch (partitionType) {
case FILES:
fileGroupCountAndRecordsPair = initializeFilesPartition(partitionInfoList);
partitionName = FILES.getPartitionPath();
break;
case BLOOM_FILTERS:
fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(dataTableInstantTime, partitionToFilesMap);
partitionName = BLOOM_FILTERS.getPartitionPath();
break;
case COLUMN_STATS:
Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = initializeColumnStatsPartition(partitionToFilesMap);
columnsToIndex = colStatsColumnsAndRecord.getKey();
fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
partitionName = COLUMN_STATS.getPartitionPath();
break;
case RECORD_INDEX:
fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
partitionName = RECORD_INDEX.getPartitionPath();
break;
case EXPRESSION_INDEX:
Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient);
if (expressionIndexPartitionsToInit.size() != 1) {
if (expressionIndexPartitionsToInit.size() > 1) {
LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit);
}
continue;
}
partitionName = expressionIndexPartitionsToInit.iterator().next();
fileGroupCountAndRecordsPair = initializeExpressionIndexPartition(partitionName, dataTableInstantTime);
break;
case PARTITION_STATS:
// For PARTITION_STATS, COLUMN_STATS should also be enabled
if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
LOG.warn("Skipping partition stats initialization as column stats index is not enabled. Please enable {}",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
continue;
}
fileGroupCountAndRecordsPair = initializePartitionStatsIndex();
partitionName = PARTITION_STATS.getPartitionPath();
break;
case SECONDARY_INDEX:
Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient);
if (secondaryIndexPartitionsToInit.size() != 1) {
if (secondaryIndexPartitionsToInit.size() > 1) {
LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit);
}
continue;
}
partitionName = secondaryIndexPartitionsToInit.iterator().next();
fileGroupCountAndRecordsPair = initializeSecondaryIndexPartition(partitionName);
break;
default:
throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType));
}
} catch (Exception e) {
String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
metrics.ifPresent(m -> m.setMetric(metricKey, 1));
String errMsg = String.format("Bootstrap on %s partition failed for %s",
partitionType.getPartitionPath(), metadataMetaClient.getBasePath());
LOG.error(errMsg, e);
throw new HoodieMetadataException(errMsg, e);
}
if (LOG.isInfoEnabled()) {
LOG.info("Initializing {} index with {} mappings", partitionTypeName, fileGroupCountAndRecordsPair.getKey());
}
HoodieTimer partitionInitTimer = HoodieTimer.start();
// Generate the file groups
final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionTypeName + " should be > 0");
initializeFileGroups(dataMetaClient, partitionType, instantTimeForPartition, fileGroupCount, partitionName);
// Perform the commit using bulkCommit
HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
bulkCommit(instantTimeForPartition, partitionName, records, fileGroupCount);
if (partitionType == COLUMN_STATS) {
// initialize Col Stats index definition
updateColumnsToIndexWithColStats(columnsToIndex);
}
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionName, true);
// initialize the metadata reader again so the MDT partition can be read after initialization
initMetadataReader();
long totalInitTime = partitionInitTimer.endTimer();
LOG.info("Initializing {} index in metadata table took {} in ms", partitionTypeName, totalInitTime);
}
return true;
}