in aios/apps/facility/build_service/build_service/admin/GenerationTask.cpp [298:467]
bool GenerationTask::doLoadFromConfig(const string& configPath, const string& generationDir,
const DataDescriptions& dataDescriptions, BuildStep buildStep)
{
ResourceReaderPtr resourceReader(new ResourceReader(configPath));
resourceReader->init();
ConfigReaderAccessorPtr configResource;
_resourceManager->getResource(configResource);
if (!configResource->addConfig(resourceReader, true)) {
BS_LOG(ERROR, "GenerationTask addConfig failed, configPath[%s]", configPath.c_str());
return false;
}
vector<string> clusters;
if (!getAllClusterNames(clusters)) {
REPORT(SERVICE_ERROR_CONFIG, "get clusters failed for [%s]", _buildId.ShortDebugString().c_str());
return false;
}
KeyValueMap kvMap;
auto dsVec = dataDescriptions.toVector();
if (!dataDescriptions.empty()) {
DataDescription& incDs = const_cast<DataDescription&>(dsVec.back());
if (DataSourceHelper::isRealtime(incDs)) {
if (incDs.find(SWIFT_STOP_TIMESTAMP) == incDs.end()) {
_realTimeDataDesc = incDs;
_realTimeDataDesc[config::SRC_SIGNATURE] = StringUtil::toString(dataDescriptions.size() - 1);
BS_LOG(INFO, "find realtime swift-like info for generation task [%s].",
_buildId.ShortDebugString().c_str());
}
if (incDs.find(SRC_BATCH_MODE) != incDs.end() && incDs[SRC_BATCH_MODE] == "true") {
_batchMode = true;
}
}
DataDescription& firstDs = const_cast<DataDescription&>(dsVec.front());
if (firstDs[READ_SRC_TYPE] == INDEX_CLONE_SRC) {
if (dsVec.size() != 2) {
BS_LOG(ERROR, "no incremental dataDescription defined in task [%s].",
_buildId.ShortDebugString().c_str());
return false;
}
if (!DataSourceHelper::isRealtime(_realTimeDataDesc)) {
BS_LOG(ERROR, "no swift-like-type incremental dataDescription defined in task [%s].",
_buildId.ShortDebugString().c_str());
return false;
}
auto it = firstDs.find(SOURCE_INDEX_ADMIN_ADDRESS);
if (it == firstDs.end()) {
BS_LOG(ERROR, "cannot find [%s] for src[%s] in generation task [%s].",
SOURCE_INDEX_ADMIN_ADDRESS.c_str(), INDEX_CLONE_SRC.c_str(),
_buildId.ShortDebugString().c_str());
return false;
}
kvMap[SOURCE_INDEX_ADMIN_ADDRESS] = it->second;
it = firstDs.find(SOURCE_INDEX_BUILD_ID);
if (it == firstDs.end()) {
BS_LOG(ERROR, "cannot find [%s] for src[%s] in generation task [%s].", SOURCE_INDEX_BUILD_ID.c_str(),
INDEX_CLONE_SRC.c_str(), _buildId.ShortDebugString().c_str());
return false;
}
kvMap[SOURCE_INDEX_BUILD_ID] = it->second;
it = firstDs.find(MADROX_ADMIN_ADDRESS);
if (it == firstDs.end()) {
BS_LOG(ERROR, "cannot find [%s] for src[%s] in generation task [%s].", MADROX_ADMIN_ADDRESS.c_str(),
INDEX_CLONE_SRC.c_str(), _buildId.ShortDebugString().c_str());
return false;
}
kvMap[MADROX_ADMIN_ADDRESS] = it->second;
kvMap["needCloneIndex"] = "true";
}
}
KeyValueMap schemaIdMap;
for (auto cluster : clusters) {
auto schema = resourceReader->getTabletSchema(cluster);
if (!schema) {
REPORT(SERVICE_ERROR_CONFIG, "read [%s] schema failed", cluster.c_str());
return false;
}
schemaIdMap[cluster] = StringUtil::toString(schema->GetSchemaId());
}
kvMap["dataDescriptions"] = ToJsonString(dsVec, true);
kvMap["realtimeDataDescription"] = ToJsonString(_realTimeDataDesc, true);
if (_realTimeDataDesc.empty()) {
kvMap["hasRealtimeDataDesc"] = "false";
} else {
kvMap["hasRealtimeDataDesc"] = "true";
}
kvMap["buildId"] = ProtoUtil::buildIdToStr(_buildId);
kvMap["buildStep"] = buildStep == proto::BUILD_STEP_FULL ? config::BUILD_STEP_FULL_STR : config::BUILD_STEP_INC_STR;
kvMap["clusterNames"] = ToJsonString(clusters, true);
kvMap["schemaIdMap"] = ToJsonString(schemaIdMap, true);
kvMap["configPath"] = configPath;
if (_isImportedTask) {
kvMap["useRandomInitialPathVersion"] = "true";
KeyValueMap tmpIdStrMap;
for (const auto& [clusterName, versionId] : _importedVersionIdMap) {
tmpIdStrMap[clusterName] = autil::StringUtil::toString(versionId);
}
kvMap["importedVersionIdMap"] = ToJsonString(tmpIdStrMap, true);
}
config::ControlConfig controlConfig;
if (!resourceReader->getDataTableConfigWithJsonPath(_buildId.datatable(), "control_config", controlConfig)) {
BS_LOG(ERROR, "get control_config.is_inc_processor_exist failed from dataTable[%s] failed",
_buildId.datatable().c_str());
return false;
}
if (_batchMode && buildStep == BUILD_STEP_INC) {
REPORT(SERVICE_ERROR_CONFIG, "batch mode not allow start from inc step, buildId: %s",
_buildId.ShortDebugString().c_str());
return false;
}
string targetGraph;
GraphConfig graphConfig;
if (resourceReader->getGraphConfig(graphConfig)) {
targetGraph = graphConfig.getGraphName();
const KeyValueMap& params = graphConfig.getGraphParam();
vector<string> graphParamKeys;
auto iter = params.begin();
for (; iter != params.end(); iter++) {
kvMap[iter->first] = iter->second;
graphParamKeys.push_back(iter->first);
}
kvMap["user_define_parameter_keys"] = ToJsonString(graphParamKeys, true);
} else {
if (controlConfig.getDataLinkMode() == ControlConfig::DataLinkMode::NPC_MODE) {
targetGraph = "BuildIndexV2.npc_mode.graph";
} else {
if (controlConfig.useIndexV2() || _forceUseTabletV2Build) {
targetGraph = _batchMode ? "BatchBuildV2/FullBatchBuild.graph" : "BuildIndexV2.graph";
} else {
targetGraph = _batchMode ? "BatchBuild/FullBatchBuild.graph" : "BuildIndex.graph";
}
}
}
if (!DataLinkModeUtil::addGraphParameters(controlConfig, clusters, dsVec, &kvMap)) {
REPORT(SERVICE_ERROR_CONFIG, "parse graph parameters failed, buildId: %s", _buildId.ShortDebugString().c_str());
return false;
}
if (!prepareCheckpointSynchronizer(resourceReader)) {
REPORT(SERVICE_ERROR_CONFIG, "init checkpoint synchronizer [%s] failed", _buildId.ShortDebugString().c_str());
return false;
}
if (!_taskFlowManager->loadSubGraph("", targetGraph, kvMap)) {
REPORT(SERVICE_ERROR_CONFIG, "init [%s] graph failed", _buildId.ShortDebugString().c_str());
return false;
}
_taskFlowManager->stepRun();
BuildServiceConfig buildServiceConfig;
if (!resourceReader->getConfigWithJsonPath("build_app.json", "", buildServiceConfig)) {
string errorMsg = "failed to get build service config in config[" + getConfigPath() + "]";
REPORT(SERVICE_ERROR_CONFIG, "%s", errorMsg.c_str());
return false;
}
_counterConfig = buildServiceConfig.counterConfig;
_currentProcessorStep = buildStep;
_dataDescriptionsStr = ToJsonString(dsVec, true);
bool hasError = false;
bool v2Build = isV2Build(resourceReader, hasError);
if (hasError) {
REPORT(SERVICE_ERROR_CONFIG, "read config failed, reject update config");
return false;
}
_isV2Build = v2Build;
return true;
}