in aios/apps/facility/build_service/build_service/admin/GenerationTask.cpp [959:1101]
bool GenerationTask::doUpdateConfig(const string& configPath)
{
if (isSuspending()) {
REPORT(SERVICE_ERROR_NONE, "can't update config when generation is suspending.");
return false;
}
if (isRollingBack()) {
REPORT(SERVICE_ERROR_NONE, "can't update config when generation is rolling back.");
return false;
}
vector<string> allClusters;
ConfigReaderAccessorPtr configReaderAccessor;
_resourceManager->getResource(configReaderAccessor);
ResourceReaderPtr oldResourceReader = configReaderAccessor->getLatestConfig();
if (fslib::util::FileUtil::normalizeDir(configPath) ==
fslib::util::FileUtil::normalizeDir(oldResourceReader->getConfigPath())) {
REPORT(SERVICE_ERROR_CONFIG, "can't update config with the same configPath");
return false;
}
if (!getAllClusterNames(allClusters)) {
REPORT(SERVICE_ERROR_CONFIG, "get clusters failed for [%s]", _buildId.ShortDebugString().c_str());
return false;
}
vector<string> chgSchemaClusters;
KeyValueMap schemaIdMap;
// vector<int64_t> chgSchemaVersions;
KeyValueMap opsIdMap;
ResourceReaderPtr newResourceReader(new ResourceReader(configPath));
newResourceReader->init();
if (!prepareCheckpointSynchronizer(newResourceReader)) {
BS_LOG(ERROR, "upc init checkpoint synchronizer [%s] failed", _buildId.ShortDebugString().c_str());
return false;
}
bool isOldV2BuildMode = _isV2Build;
bool hasError = false;
bool isNewV2BuildMode = isV2Build(newResourceReader, hasError);
if (hasError) {
REPORT(SERVICE_ERROR_CONFIG, "read new config failed, reject update config");
return false;
}
if (isOldV2BuildMode && !isNewV2BuildMode) {
REPORT(SERVICE_ERROR_CONFIG, "not support v2 build mode to v1 build mode");
return false;
}
bool isV1BuildModeToV2BuildMode = !isOldV2BuildMode && isNewV2BuildMode;
bool hasOperations = false;
for (const auto& clusterName : allClusters) {
ConfigValidator::SchemaUpdateStatus status = checkUpdateSchema(newResourceReader, clusterName);
if (status == ConfigValidator::SchemaUpdateStatus::UPDATE_ILLEGAL) {
REPORT(SERVICE_ERROR_CONFIG, "check update schema failed");
return false;
}
if (status == ConfigValidator::SchemaUpdateStatus::UPDATE_SCHEMA) {
chgSchemaClusters.push_back(clusterName);
auto tabletSchema = newResourceReader->getTabletSchema(clusterName);
auto schema = tabletSchema->GetLegacySchema();
if (isNewV2BuildMode) {
IndexCheckpointAccessorPtr indexCheckpointAccessor =
CheckpointCreator::createIndexCheckpointAccessor(_resourceManager);
::google::protobuf::RepeatedPtrField<proto::IndexInfo> indexInfos;
if (!indexCheckpointAccessor->getIndexInfo(false, clusterName, indexInfos)) {
REPORT(SERVICE_ERROR_CONFIG, "get index info failed");
return false;
}
if (!checkReAddIndex(clusterName, tabletSchema, indexInfos)) {
REPORT(SERVICE_ERROR_CONFIG, "check update schema failed, "
"new schema add same index while old index still exist, "
"please wait old index deleted then readd it");
return false;
}
}
if (schema && schema->HasModifyOperations()) {
hasOperations = true;
opsIdMap[clusterName] = getOpsInfos(schema, clusterName);
} else {
if (hasOperations) {
REPORT(SERVICE_ERROR_CONFIG, "multi cluster change schema not support"
" two change schame types together");
return false;
}
}
schemaIdMap[clusterName] = StringUtil::toString(tabletSchema->GetSchemaId());
}
// add test new add index not exist in current index
}
configReaderAccessor->addConfig(newResourceReader, true);
if (isV1BuildModeToV2BuildMode) {
if (!chgSchemaClusters.empty()) {
BS_LOG(ERROR, "v1 builde mode to v2 build mode not support change schema");
return false;
}
if (getBuildStep() == proto::BUILD_STEP_FULL) {
BS_LOG(ERROR, "full building not support v1 build mode to v2 build mode");
return false;
}
string updateGraph = "BuildV1ToBuildV2.graph";
KeyValueMap kvMap;
kvMap["clusterNames"] = ToJsonString(allClusters, true);
kvMap["buildStep"] = config::BUILD_STEP_INC_STR;
kvMap["buildId"] = ProtoUtil::buildIdToStr(_buildId);
vector<string> flowIds;
string incProcessorTag = "BSIncProcessor";
_taskFlowManager->getFlowIdByTag(incProcessorTag, flowIds);
if (flowIds.size() == 0) {
kvMap[DATA_DESCRIPTION_KEY] = ToJsonString(_realTimeDataDesc);
}
if (!_taskFlowManager->loadSubGraph("", updateGraph, kvMap)) {
BS_LOG(ERROR, "load graph [%s] fail", updateGraph.c_str());
return false;
}
}
GraphConfig graphConfig;
auto status = newResourceReader->getGraphConfigReturnStatus(graphConfig);
bool hasIncBuilder = true;
if (status == ResourceReader::ERROR) {
BS_LOG(ERROR, "load graph failed");
return false;
}
auto graphParam = graphConfig.getGraphParam();
auto iter = graphParam.find("disableIncBuilder");
if (iter != graphParam.end() && iter->second == "true") {
hasIncBuilder = false;
}
if (hasIncBuilder) {
if (!chgSchemaClusters.empty() && !doUpdateSchema(hasOperations, chgSchemaClusters, schemaIdMap, opsIdMap)) {
return false;
}
}
if (!innerUpdateConfig(configPath)) {
return false;
}
if (!updatePackageInfo(oldResourceReader, newResourceReader)) {
return false;
}
_isV2Build = isNewV2BuildMode;
return true;
}