in manager/manager/src/main/java/org/apache/doris/stack/component/ManagerMetaSyncComponent.java [76:177]
public void syncPaloClusterMetadata(ClusterInfoEntity clusterInfo) throws Exception {
long clusterId = clusterInfo.getId();
try {
log.info("Start to sync palo cluster {} metadata to manager.", clusterId);
List<String> databaseList = metaInfoClient.getDatabaseList(ConstantDef.DORIS_DEFAULT_NS, clusterInfo);
// Get the list of all databases in Doris cluster
List<String> oldDatabaseList = databaseRepository.getNameByClusterId(clusterId);
// Get the newly added database, and the corresponding table and field are newly added
List<String> addDbList = ListUtil.getAddList(databaseList, oldDatabaseList);
log.info("Get new database {}.", addDbList);
for (String db : addDbList) {
if (db.equals(ConstantDef.MYSQL_DEFAULT_SCHEMA)) {
continue;
}
int dbId = addDatabase(db, "", 0, clusterId);
List<String> tableList = metaInfoClient.getTableList(ConstantDef.DORIS_DEFAULT_NS, db, clusterInfo);
for (String table : tableList) {
TableSchemaInfo.TableSchema tableSchema =
metaInfoClient.getTableBaseSchema(ConstantDef.DORIS_DEFAULT_NS, db, table, clusterInfo);
int tableId = addTable(dbId, table, "", tableSchema);
addTableFieldList(tableId, tableSchema);
}
}
// Get the reduced database, and delete the corresponding table and field
List<String> reduceDbList = ListUtil.getReduceList(databaseList, oldDatabaseList);
log.info("Get reduce database {}.", reduceDbList);
for (String db : reduceDbList) {
deleteDatabase(db, clusterId);
}
// Get the original database, compare whether the table and field in it are updated,
// and modify them if they are updated
List<String> existDbList = ListUtil.getExistList(databaseList, oldDatabaseList);
log.info("Get exist database {}.", existDbList);
for (String db : existDbList) {
ManagerDatabaseEntity databaseEntity = databaseRepository.getByClusterIdAndName(clusterId, db).get(0);
int dbId = databaseEntity.getId();
log.debug("update database {}", dbId);
List<String> tableList = metaInfoClient.getTableList(ConstantDef.DORIS_DEFAULT_NS, db, clusterInfo);
List<ManagerTableEntity> tableEntities = tableRepository.getByDbId(dbId);
List<String> oldTableList = new ArrayList<>();
for (ManagerTableEntity tableEntity : tableEntities) {
oldTableList.add(tableEntity.getName());
}
// Get the newly added table and get the corresponding newly added field information
List<String> addTableList = ListUtil.getAddList(tableList, oldTableList);
log.info("Get new table {}.", addTableList);
for (String table : addTableList) {
TableSchemaInfo.TableSchema tableSchema =
metaInfoClient.getTableBaseSchema(ConstantDef.DORIS_DEFAULT_NS, db, table, clusterInfo);
int tableId = addTable(dbId, table, "", tableSchema);
addTableFieldList(tableId, tableSchema);
}
// Get the deleted table and delete the corresponding field information
List<String> reduceTableList = ListUtil.getReduceList(tableList, oldTableList);
log.info("Get reduce table {}.", reduceTableList);
for (String table : reduceTableList) {
int tableId = getTableId(tableEntities, table);
deleteTable(tableId);
}
// Get the original table and compare whether the field is updated. If it is updated, modify it.
// Here, the way to modify the field information is to delete it all first and then store it again
List<String> existTableList = ListUtil.getExistList(tableList, oldTableList);
log.info("Get exist table {}.", existTableList);
for (String table : existTableList) {
TableSchemaInfo.TableSchema tableSchema =
metaInfoClient.getTableBaseSchema(ConstantDef.DORIS_DEFAULT_NS, db, table, clusterInfo);
List<String> tableFields = tableSchema.fieldList();
int tableId = getTableId(tableEntities, table);
List<ManagerFieldEntity> fieldEntities = fieldRepository.getByTableId(tableId);
List<String> oldTableFields = new ArrayList<>();
for (ManagerFieldEntity fieldEntity : fieldEntities) {
oldTableFields.add(fieldEntity.getName());
}
List<String> existTableFields = ListUtil.getExistList(tableFields, oldTableFields);
// If the field field information of the table is not modified
// TODO:At present, only the number and name of fields are compared
if (existTableFields.size() == tableFields.size() && tableFields.size() == oldTableFields.size()) {
log.debug("The table {} field information has not been modified.", tableId);
} else {
log.debug("The table {} field information has been modified, update fields", tableId);
fieldRepository.deleteByTableId(tableId);
addTableFieldList(tableId, tableSchema);
}
}
}
log.info("End to sync palo cluster {} metadata to studio.", clusterId);
} catch (Exception e) {
log.error("Sync palo cluster {} metadata exception {}", clusterId, e);
throw new MetaDataSyncException("Sync palo cluster metadata error.");
}
}