bool GenerationTask::doLoadFromConfig()

in aios/apps/facility/build_service/build_service/admin/GenerationTask.cpp [298:467]


bool GenerationTask::doLoadFromConfig(const string& configPath, const string& generationDir,
                                      const DataDescriptions& dataDescriptions, BuildStep buildStep)
{
    ResourceReaderPtr resourceReader(new ResourceReader(configPath));
    resourceReader->init();
    ConfigReaderAccessorPtr configResource;
    _resourceManager->getResource(configResource);
    if (!configResource->addConfig(resourceReader, true)) {
        BS_LOG(ERROR, "GenerationTask addConfig failed, configPath[%s]", configPath.c_str());
        return false;
    }

    vector<string> clusters;
    if (!getAllClusterNames(clusters)) {
        REPORT(SERVICE_ERROR_CONFIG, "get clusters failed for [%s]", _buildId.ShortDebugString().c_str());
        return false;
    }
    KeyValueMap kvMap;
    auto dsVec = dataDescriptions.toVector();
    if (!dataDescriptions.empty()) {
        DataDescription& incDs = const_cast<DataDescription&>(dsVec.back());
        if (DataSourceHelper::isRealtime(incDs)) {
            if (incDs.find(SWIFT_STOP_TIMESTAMP) == incDs.end()) {
                _realTimeDataDesc = incDs;
                _realTimeDataDesc[config::SRC_SIGNATURE] = StringUtil::toString(dataDescriptions.size() - 1);
                BS_LOG(INFO, "find realtime swift-like info for generation task [%s].",
                       _buildId.ShortDebugString().c_str());
            }
            if (incDs.find(SRC_BATCH_MODE) != incDs.end() && incDs[SRC_BATCH_MODE] == "true") {
                _batchMode = true;
            }
        }

        DataDescription& firstDs = const_cast<DataDescription&>(dsVec.front());
        if (firstDs[READ_SRC_TYPE] == INDEX_CLONE_SRC) {
            if (dsVec.size() != 2) {
                BS_LOG(ERROR, "no incremental dataDescription defined in task [%s].",
                       _buildId.ShortDebugString().c_str());
                return false;
            }
            if (!DataSourceHelper::isRealtime(_realTimeDataDesc)) {
                BS_LOG(ERROR, "no swift-like-type incremental dataDescription defined in task [%s].",
                       _buildId.ShortDebugString().c_str());
                return false;
            }
            auto it = firstDs.find(SOURCE_INDEX_ADMIN_ADDRESS);
            if (it == firstDs.end()) {
                BS_LOG(ERROR, "cannot find [%s] for src[%s] in generation task [%s].",
                       SOURCE_INDEX_ADMIN_ADDRESS.c_str(), INDEX_CLONE_SRC.c_str(),
                       _buildId.ShortDebugString().c_str());
                return false;
            }
            kvMap[SOURCE_INDEX_ADMIN_ADDRESS] = it->second;

            it = firstDs.find(SOURCE_INDEX_BUILD_ID);
            if (it == firstDs.end()) {
                BS_LOG(ERROR, "cannot find [%s] for src[%s] in generation task [%s].", SOURCE_INDEX_BUILD_ID.c_str(),
                       INDEX_CLONE_SRC.c_str(), _buildId.ShortDebugString().c_str());
                return false;
            }

            kvMap[SOURCE_INDEX_BUILD_ID] = it->second;

            it = firstDs.find(MADROX_ADMIN_ADDRESS);
            if (it == firstDs.end()) {
                BS_LOG(ERROR, "cannot find [%s] for src[%s] in generation task [%s].", MADROX_ADMIN_ADDRESS.c_str(),
                       INDEX_CLONE_SRC.c_str(), _buildId.ShortDebugString().c_str());
                return false;
            }
            kvMap[MADROX_ADMIN_ADDRESS] = it->second;
            kvMap["needCloneIndex"] = "true";
        }
    }

    KeyValueMap schemaIdMap;
    for (auto cluster : clusters) {
        auto schema = resourceReader->getTabletSchema(cluster);
        if (!schema) {
            REPORT(SERVICE_ERROR_CONFIG, "read [%s] schema failed", cluster.c_str());
            return false;
        }
        schemaIdMap[cluster] = StringUtil::toString(schema->GetSchemaId());
    }

    kvMap["dataDescriptions"] = ToJsonString(dsVec, true);
    kvMap["realtimeDataDescription"] = ToJsonString(_realTimeDataDesc, true);
    if (_realTimeDataDesc.empty()) {
        kvMap["hasRealtimeDataDesc"] = "false";
    } else {
        kvMap["hasRealtimeDataDesc"] = "true";
    }
    kvMap["buildId"] = ProtoUtil::buildIdToStr(_buildId);
    kvMap["buildStep"] = buildStep == proto::BUILD_STEP_FULL ? config::BUILD_STEP_FULL_STR : config::BUILD_STEP_INC_STR;
    kvMap["clusterNames"] = ToJsonString(clusters, true);
    kvMap["schemaIdMap"] = ToJsonString(schemaIdMap, true);
    kvMap["configPath"] = configPath;
    if (_isImportedTask) {
        kvMap["useRandomInitialPathVersion"] = "true";
        KeyValueMap tmpIdStrMap;
        for (const auto& [clusterName, versionId] : _importedVersionIdMap) {
            tmpIdStrMap[clusterName] = autil::StringUtil::toString(versionId);
        }
        kvMap["importedVersionIdMap"] = ToJsonString(tmpIdStrMap, true);
    }
    config::ControlConfig controlConfig;
    if (!resourceReader->getDataTableConfigWithJsonPath(_buildId.datatable(), "control_config", controlConfig)) {
        BS_LOG(ERROR, "get control_config.is_inc_processor_exist failed from dataTable[%s] failed",
               _buildId.datatable().c_str());
        return false;
    }
    if (_batchMode && buildStep == BUILD_STEP_INC) {
        REPORT(SERVICE_ERROR_CONFIG, "batch mode not allow start from inc step, buildId: %s",
               _buildId.ShortDebugString().c_str());
        return false;
    }
    string targetGraph;
    GraphConfig graphConfig;
    if (resourceReader->getGraphConfig(graphConfig)) {
        targetGraph = graphConfig.getGraphName();
        const KeyValueMap& params = graphConfig.getGraphParam();
        vector<string> graphParamKeys;
        auto iter = params.begin();
        for (; iter != params.end(); iter++) {
            kvMap[iter->first] = iter->second;
            graphParamKeys.push_back(iter->first);
        }
        kvMap["user_define_parameter_keys"] = ToJsonString(graphParamKeys, true);
    } else {
        if (controlConfig.getDataLinkMode() == ControlConfig::DataLinkMode::NPC_MODE) {
            targetGraph = "BuildIndexV2.npc_mode.graph";
        } else {
            if (controlConfig.useIndexV2() || _forceUseTabletV2Build) {
                targetGraph = _batchMode ? "BatchBuildV2/FullBatchBuild.graph" : "BuildIndexV2.graph";
            } else {
                targetGraph = _batchMode ? "BatchBuild/FullBatchBuild.graph" : "BuildIndex.graph";
            }
        }
    }
    if (!DataLinkModeUtil::addGraphParameters(controlConfig, clusters, dsVec, &kvMap)) {
        REPORT(SERVICE_ERROR_CONFIG, "parse graph parameters failed, buildId: %s", _buildId.ShortDebugString().c_str());
        return false;
    }
    if (!prepareCheckpointSynchronizer(resourceReader)) {
        REPORT(SERVICE_ERROR_CONFIG, "init checkpoint synchronizer [%s] failed", _buildId.ShortDebugString().c_str());
        return false;
    }
    if (!_taskFlowManager->loadSubGraph("", targetGraph, kvMap)) {
        REPORT(SERVICE_ERROR_CONFIG, "init [%s] graph failed", _buildId.ShortDebugString().c_str());
        return false;
    }
    _taskFlowManager->stepRun();

    BuildServiceConfig buildServiceConfig;
    if (!resourceReader->getConfigWithJsonPath("build_app.json", "", buildServiceConfig)) {
        string errorMsg = "failed to get build service config in config[" + getConfigPath() + "]";
        REPORT(SERVICE_ERROR_CONFIG, "%s", errorMsg.c_str());
        return false;
    }
    _counterConfig = buildServiceConfig.counterConfig;
    _currentProcessorStep = buildStep;
    _dataDescriptionsStr = ToJsonString(dsVec, true);
    bool hasError = false;
    bool v2Build = isV2Build(resourceReader, hasError);
    if (hasError) {
        REPORT(SERVICE_ERROR_CONFIG, "read config failed, reject update config");
        return false;
    }
    _isV2Build = v2Build;
    return true;
}