in fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java [168:244]
private static NewTableMetadata getTableMetadataToUpdate(
Cluster cluster,
MetadataResponse metadataResponse,
Map<Integer, ServerNode> newAliveTableServers) {
Map<TablePath, Long> newTablePathToTableId = new HashMap<>();
Map<TablePath, TableInfo> newTablePathToTableInfo = new HashMap<>();
Map<PhysicalTablePath, List<BucketLocation>> newBucketLocations = new HashMap<>();
Map<PhysicalTablePath, Long> newPartitionIdByPath = new HashMap<>();
// iterate all table metadata
List<PbTableMetadata> pbTableMetadataList = metadataResponse.getTableMetadatasList();
pbTableMetadataList.forEach(
pbTableMetadata -> {
// get table info for the table
long tableId = pbTableMetadata.getTableId();
PbTablePath protoTablePath = pbTableMetadata.getTablePath();
TablePath tablePath =
new TablePath(
protoTablePath.getDatabaseName(),
protoTablePath.getTableName());
newTablePathToTableId.put(tablePath, tableId);
TableDescriptor tableDescriptor =
TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson());
newTablePathToTableInfo.put(
tablePath,
TableInfo.of(
tablePath,
pbTableMetadata.getTableId(),
pbTableMetadata.getSchemaId(),
tableDescriptor,
pbTableMetadata.getCreatedTime(),
pbTableMetadata.getModifiedTime()));
// Get all buckets for the table.
List<PbBucketMetadata> pbBucketMetadataList =
pbTableMetadata.getBucketMetadatasList();
newBucketLocations.put(
PhysicalTablePath.of(tablePath),
toBucketLocations(
tablePath,
tableId,
null,
null,
pbBucketMetadataList,
newAliveTableServers));
});
List<PbPartitionMetadata> pbPartitionMetadataList =
metadataResponse.getPartitionMetadatasList();
// iterate all partition metadata
pbPartitionMetadataList.forEach(
pbPartitionMetadata -> {
long tableId = pbPartitionMetadata.getTableId();
// the table path should be initialized at begin
TablePath tablePath = cluster.getTablePathOrElseThrow(tableId);
PhysicalTablePath physicalTablePath =
PhysicalTablePath.of(tablePath, pbPartitionMetadata.getPartitionName());
newPartitionIdByPath.put(
physicalTablePath, pbPartitionMetadata.getPartitionId());
newBucketLocations.put(
physicalTablePath,
toBucketLocations(
tablePath,
tableId,
pbPartitionMetadata.getPartitionId(),
pbPartitionMetadata.getPartitionName(),
pbPartitionMetadata.getBucketMetadatasList(),
newAliveTableServers));
});
return new NewTableMetadata(
newTablePathToTableId,
newTablePathToTableInfo,
newBucketLocations,
newPartitionIdByPath);
}