aios/storage/indexlib/index/ann/aitheta2/util/AithetaFactoryWrapper.cpp (139 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "indexlib/index/ann/aitheta2/util/AithetaFactoryWrapper.h"
#include "indexlib/index/ann/aitheta2/util/CustomizedAithetaContainer.h"
#include "indexlib/index/ann/aitheta2/util/params_initializer/ParamsInitializerFactory.h"
using namespace std;
using namespace autil;
using namespace aitheta2;
namespace indexlibv2::index::ann {
autil::SpinLock AiThetaFactoryWrapper::lock;
bool AiThetaFactoryWrapper::CreateBuilder(const AithetaIndexConfig& config, size_t docCount, AiThetaBuilderPtr& builder)
{
string builderName = config.buildConfig.builderName;
auto intializer = ParamsInitializerFactory::Create(builderName, docCount);
ANN_CHECK(intializer, "create parameter initializer failed");
AiThetaMeta meta;
ANN_CHECK(intializer->InitAiThetaMeta(config, meta), "init failed");
// 对于图算法,proxima没有支持mips转换, 因此使用球面距离
if ((builderName == HNSW_BUILDER || builderName == QGRAPH_BUILDER) && meta.measure_name() == INNER_PRODUCT) {
meta.set_measure(MIPS_SQUARED_EUCLIDEAN, 0, AiThetaParams());
AUTIL_LOG(INFO, "update distance type from %s to %s", INNER_PRODUCT.c_str(), MIPS_SQUARED_EUCLIDEAN.c_str());
}
AiThetaParams params;
ANN_CHECK(intializer->InitNormalBuildParams(config, params), "init failed");
builder = AiThetaFactory::CreateBuilder(builderName);
ANN_CHECK(builder != nullptr, "create failed");
ANN_CHECK_OK(builder->init(meta, params), "init failed");
AUTIL_LOG(INFO, "create index builder[%s] success", builderName.c_str());
return true;
}
bool AiThetaFactoryWrapper::CreateReducer(const AithetaIndexConfig& config, AiThetaReducerPtr& reducer)
{
string reducerName = "";
if (config.buildConfig.builderName == HNSW_BUILDER) {
reducerName = "HnswDistributedReducer";
} else if (config.buildConfig.builderName == QGRAPH_BUILDER) {
reducerName = "QGraphDistributedReducer";
}
auto intializer = ParamsInitializerFactory::Create(config.buildConfig.builderName, 0);
ANN_CHECK(intializer, "create parameter initializer failed");
AiThetaParams params;
params.set("proxima.hnsw.distributed_reducer.num_of_prune_threads", 10);
ANN_CHECK(intializer->InitNormalBuildParams(config, params), "init failed");
reducer = AiThetaFactory::CreateReducer(reducerName);
ANN_CHECK(reducer != nullptr, "create failed");
ANN_CHECK_OK(reducer->init(params), "init failed");
AUTIL_LOG(INFO, "create index reducer[%s] success", reducerName.c_str());
return true;
}
bool AiThetaFactoryWrapper::CreateSearcher(const AithetaIndexConfig& config, const IndexMeta& indexMeta,
const IndexDataReaderPtr& reader, AiThetaSearcherPtr& searcher)
{
auto& searcherName = indexMeta.searcherName;
size_t docCount = indexMeta.docCount;
auto initializer = ParamsInitializerFactory::Create(searcherName, docCount);
ANN_CHECK(initializer != nullptr, "create parameter initializer failed");
AiThetaParams params;
AithetaIndexConfig newConfig = config;
if (indexMeta.builderName == OSWG_STREAMER || indexMeta.builderName == QC_STREAMER) {
newConfig.searchConfig.indexParams = config.realtimeConfig.indexParams;
}
ANN_CHECK(initializer->InitNormalSearchParams(newConfig, params), "init failed");
auto container = std::make_shared<CustomizedAiThetaContainer>(reader);
ANN_CHECK_OK(container->init(params), "container init failed");
ANN_CHECK_OK(container->load(), "container load failed");
searcher = AiThetaFactory::CreateSearcher(searcherName);
ANN_CHECK(searcher != nullptr, "create searcher[%s] failed", searcherName.c_str());
ANN_CHECK_OK(searcher->init(params), "searcher init failed");
ANN_CHECK_OK(searcher->load(container, aitheta2::IndexMeasure::Pointer()), "searcher load failed");
return true;
}
bool AiThetaFactoryWrapper::CreateStreamer(const AithetaIndexConfig& config,
const std::shared_ptr<RealtimeIndexBuildResource>& resource,
AiThetaStreamerPtr& streamer)
{
string streamerName = config.realtimeConfig.streamerName;
bool isColdStart = true;
bool hasMultiIndex = false;
if (resource != nullptr) {
std::string builderName = resource->normalIndexMeta.builderName;
if (streamerName == QGRAPH_STREAMER && builderName == QGRAPH_BUILDER) {
isColdStart = false;
} else if (streamerName == QC_STREAMER && builderName == QC_BUILDER) {
isColdStart = false;
}
// 多类目索引中indexId肯定不是kDefaultIndexId
hasMultiIndex = resource->indexId != kDefaultIndexId;
}
if (isColdStart && (streamerName != HNSW_STREAMER || streamerName != OSWG_STREAMER)) {
AUTIL_LOG(INFO, "update streamer from %s to %s", streamerName.c_str(), OSWG_STREAMER.c_str());
streamerName = OSWG_STREAMER;
}
auto initializer = ParamsInitializerFactory::Create(streamerName);
ANN_CHECK(initializer, "create parameter initializer failed");
AiThetaParams params;
ANN_CHECK(initializer->InitRealtimeBuildParams(config, params, hasMultiIndex), "init failed");
AiThetaParams searchParams;
ANN_CHECK(initializer->InitRealtimeSearchParams(config, searchParams), "init failed");
params.merge(searchParams);
streamer = AiThetaFactory::CreateStreamer(streamerName);
ANN_CHECK(streamer != nullptr, "create streamer %s failed", streamerName.c_str());
if (isColdStart) {
AiThetaMeta meta;
ANN_CHECK(initializer->InitAiThetaMeta(config, meta), "init failed");
// 对于冷启动,无法使用mips转换(没有全局norm), 因此使用球面距离
if (meta.measure_name() == INNER_PRODUCT) {
meta.set_measure(MIPS_SQUARED_EUCLIDEAN, 0, AiThetaParams());
AUTIL_LOG(INFO, "update distance type from %s to %s", INNER_PRODUCT.c_str(),
MIPS_SQUARED_EUCLIDEAN.c_str());
}
ANN_CHECK_OK(streamer->init(meta, params), "init failed");
} else {
auto reader = resource->normalIndexDataReader;
auto container = std::make_shared<CustomizedAiThetaContainer>(reader);
ANN_CHECK_OK(container->init(params), "container init failed");
ANN_CHECK_OK(container->load(), "container load failed");
ANN_CHECK_OK(streamer->init(container, params), "streamer init failed");
}
AiThetaStoragePtr storage;
ANN_CHECK(CreateStorage(TMP_MEM_STORAGE, params, storage), "create storage failed");
ANN_CHECK_OK(streamer->open(storage), "streamer open storage failed");
return true;
}
bool AiThetaFactoryWrapper::CreateStorage(const std::string& name, const AiThetaParams& params,
AiThetaStoragePtr& storage)
{
storage = AiThetaFactory::CreateStorage(name);
ANN_CHECK(storage != nullptr, "create[%s] storage failed", name.c_str());
ANN_CHECK_OK(storage->init(params), "init failed");
do {
autil::ScopedSpinLock lock(AiThetaFactoryWrapper::lock);
string path = StringUtil::toString(TimeUtility::currentTime());
auto instance = aitheta2::IndexMemory::Instance();
ANN_CHECK(instance != nullptr, "get index memory instance failed");
if (instance->has(path)) {
continue;
}
ANN_CHECK_OK(storage->open(path, true), "open storage failed");
break;
} while (true);
return true;
}
AUTIL_LOG_SETUP(indexlib.index, AiThetaFactoryWrapper);
} // namespace indexlibv2::index::ann