aios/storage/indexlib/index/ann/aitheta2/impl/SegmentMeta.cpp (128 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "indexlib/index/ann/aitheta2/impl/SegmentMeta.h"
#include "indexlib/index/ann/aitheta2/util/parallel_merge/ParallelMergeUtil.h"
#include "indexlib/index/ann/aitheta2/util/parallel_merge/ParallelReduceMeta.h"
using namespace std;
using namespace autil::legacy;
using namespace indexlib::file_system;
namespace indexlibv2::index::ann {
static constexpr const char* SEGMENT_META_FILE = "aitheta.segment.meta";
static constexpr const char* UNKNOWN_SEGMENT = "unknown";
static constexpr const char* NORMAL_SEGMENT = "normal";
static constexpr const char* REALTIME_SEGMENT = "realtime";
static constexpr const char* STATS_TYPE_JSON_STRING = "json_string";
TrainStats::TrainStats() : statsType(STATS_TYPE_JSON_STRING), stats {"{}"} {}
void TrainStats::Jsonize(JsonWrapper& json)
{
json.Jsonize("stats_type", statsType, STATS_TYPE_JSON_STRING);
json.Jsonize("stats", stats, "{}");
}
BuildStats::BuildStats() : statsType(STATS_TYPE_JSON_STRING), stats {"{}"} {}
void BuildStats::Jsonize(JsonWrapper& json)
{
json.Jsonize("stats_type", statsType, STATS_TYPE_JSON_STRING);
json.Jsonize("stats", stats, "{}");
}
void IndexMeta::Jsonize(JsonWrapper& json)
{
json.Jsonize("doc_count", docCount);
json.Jsonize("builder_name", builderName);
json.Jsonize("searcher_name", searcherName);
json.Jsonize("train_stats", trainStats, TrainStats {});
json.Jsonize("build_stats", buildStats, BuildStats {});
}
bool SegmentMeta::IsExist(const indexlib::file_system::DirectoryPtr& directory)
{
return directory->IsExist(SEGMENT_META_FILE) || HasParallelMergeMeta(directory);
}
bool SegmentMeta::HasParallelMergeMeta(const indexlib::file_system::DirectoryPtr& directory)
{
ParallelReduceMeta parallelReduceMeta;
return parallelReduceMeta.Load(directory);
}
bool SegmentMeta::Load(const indexlib::file_system::DirectoryPtr& directory)
{
if (directory->IsExist(SEGMENT_META_FILE)) {
return DoLoad(directory);
}
ParallelReduceMeta parallelReduceMeta;
ANN_CHECK(parallelReduceMeta.Load(directory), "load meta failed");
vector<DirectoryPtr> directories;
ParallelMergeUtil::GetParallelMergeDirs(directory, parallelReduceMeta, directories);
for (size_t i = 0; i < directories.size(); ++i) {
if (i == 0) {
ANN_CHECK(DoLoad(directories[i]), "load meta failed");
} else {
SegmentMeta meta;
ANN_CHECK(meta.DoLoad(directories[i]), "load meta failed");
ANN_CHECK(Merge(meta), "merge meta failed");
}
}
return true;
}
bool SegmentMeta::DoLoad(const indexlib::file_system::DirectoryPtr& directory)
{
try {
indexlib::file_system::ReaderOption readerOption(FSOT_MMAP);
readerOption.mayNonExist = true;
auto reader = directory->CreateFileReader(SEGMENT_META_FILE, readerOption);
ANN_CHECK(reader, "create failed");
string content((char*)reader->GetBaseAddress(), reader->GetLength());
FromJsonString(*this, content);
reader->Close().GetOrThrow();
AUTIL_LOG(INFO, "segment meta load from[%s]", reader->DebugString().c_str());
} catch (const autil::legacy::ExceptionBase& e) {
AUTIL_LOG(ERROR, "load failed, error[%s]", e.what());
return false;
}
return true;
}
bool SegmentMeta::Merge(const SegmentMeta& segmentMeta)
{
ANN_CHECK(_dimension == segmentMeta.GetDimension(), "dimension mismatch");
SetSegmentSize(_segmentDataSize + segmentMeta.GetSegmentSize());
for (auto& [indexId, indexMeta] : segmentMeta.GetIndexMetaMap()) {
ANN_CHECK(AddIndexMeta(indexId, indexMeta), "merge index meta failed, index id[%lu] repeated in segment meta",
indexId);
}
return true;
}
bool SegmentMeta::Dump(const indexlib::file_system::DirectoryPtr& directory)
{
try {
directory->RemoveFile(SEGMENT_META_FILE, RemoveOption::MayNonExist());
auto writer = directory->CreateFileWriter(SEGMENT_META_FILE);
ANN_CHECK(writer, "create writer failed");
string content = ToJsonString(*this);
writer->Write(content.data(), content.size()).GetOrThrow();
writer->Close().GetOrThrow();
AUTIL_LOG(INFO, "segment meta dump to[%s]", writer->DebugString().c_str());
} catch (const autil::legacy::ExceptionBase& e) {
AUTIL_LOG(ERROR, "dump failed, error[%s]", e.what());
return false;
}
return true;
}
void SegmentMeta::Jsonize(JsonWrapper& json)
{
json.Jsonize("segment_type", _segmentType);
string type = UNKNOWN_SEGMENT;
if (_segmentType == ST_NORMAL) {
type = NORMAL_SEGMENT;
} else if (_segmentType == ST_REALTIME) {
type = REALTIME_SEGMENT;
}
json.Jsonize("segment_type_string", type);
json.Jsonize("doc_count", _docCount);
json.Jsonize("index_count", _indexCount);
json.Jsonize("segment_data_size", _segmentDataSize);
json.Jsonize("dimension", _dimension);
json.Jsonize("index_info", _indexMetaMap);
json.Jsonize("is_merged_segment", _isMergedSegment, _isMergedSegment);
}
AUTIL_LOG_SETUP(indexlib.index, SegmentMeta);
std::string SegmentType2Str(const SegmentType& type)
{
static const std::vector<std::string> kType = {"Unknown", "NormalIndex", "RealtimeIndex"};
return kType[static_cast<int16_t>(type)];
}
} // namespace indexlibv2::index::ann