in aios/storage/indexlib/table/common/CommonVersionImporter.cpp [121:281]
Status CommonVersionImporter::Import(const std::vector<framework::Version>& versions, const framework::Fence* fence,
const framework::ImportOptions& options, framework::Version* baseVersion)
{
if (fence == nullptr) {
auto status = Status::InvalidArgs("Fence Description is nullptr!");
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
// 1 check input versions
std::vector<framework::Version> validVersions;
auto status = Check(versions, baseVersion, options, &validVersions);
RETURN_IF_STATUS_ERROR(status, "version import check failed");
// 2 calculate new locator
framework::Locator resultLocator = baseVersion->GetLocator();
uint32_t formatVersion = 0;
int64_t resultMaxTs = 0;
for (const auto& version : validVersions) {
auto versionLocator = version.GetLocator();
if (!resultLocator.IsSameSrc(versionLocator, false)) {
if (options.GetImportStrategy() == KEEP_SEGMENT_OVERWRITE_LOCATOR) {
resultLocator = versionLocator;
}
} else {
if (!resultLocator.Update(version.GetLocator())) {
AUTIL_LOG(ERROR,
"locator update failed when import version[%d] with locator[%s]."
" base locator is [%s]. current result locator is [%s]",
version.GetVersionId(), version.GetLocator().DebugString().c_str(),
baseVersion->GetLocator().DebugString().c_str(), resultLocator.DebugString().c_str());
return Status::InternalError();
}
}
if (version.GetFormatVersion() > formatVersion) {
formatVersion = version.GetFormatVersion();
resultMaxTs = version.GetTimestamp();
} else {
resultMaxTs = std::max(resultMaxTs, version.GetTimestamp());
}
}
// 3 import new segments
std::vector<std::shared_ptr<framework::SegmentDescriptions>> otherSegDescs;
std::set<segmentid_t> newSegments;
// rm version file in logical fs at the end to avoid conflict
std::set<versionid_t> mountedVersionIds;
bool removeVersionRet = true;
{
autil::ScopeGuard sg([&mountedVersionIds, &fence, &removeVersionRet]() {
for (const auto& versionId : mountedVersionIds) {
std::string versionFileName = VERSION_FILE_NAME_PREFIX + std::string(".") + std::to_string(versionId);
indexlib::file_system::RemoveOption removeOption;
removeOption.logicalDelete = true;
removeOption.mayNonExist = true;
auto status = fence->GetFileSystem()->RemoveFile(versionFileName, removeOption).Status();
if (!status.IsOK()) {
removeVersionRet = false;
AUTIL_LOG(ERROR, "logical remove file %s failed", versionFileName.c_str());
} else {
AUTIL_LOG(INFO, "logical remove file %s", versionFileName.c_str());
}
}
});
auto root = indexlib::file_system::Directory::Get(fence->GetFileSystem());
// check if each input segment's locator is ahead of baseVersion's locator
for (const auto& validVersion : validVersions) {
const std::string fenceRoot = PathUtil::JoinPath(fence->GetGlobalRoot(), validVersion.GetFenceName());
indexlib::file_system::MountOption mountOption(indexlib::file_system::FSMT_READ_ONLY);
mountOption.conflictResolution = indexlib::file_system::ConflictResolution::CHECK_DIFF;
auto status = fence->GetFileSystem()
->MountVersion(fenceRoot, validVersion.GetVersionId(),
/*logicalPath=*/"", mountOption, nullptr)
.Status();
RETURN_IF_STATUS_ERROR(status, "mount import version[%d] fence[%s] failed", validVersion.GetVersionId(),
validVersion.GetFenceName().c_str());
mountedVersionIds.insert(validVersion.GetVersionId());
otherSegDescs.push_back(validVersion.GetSegmentDescriptions());
auto importVersionLocator = validVersion.GetLocator();
for (auto [segmentId, schemaId] : validVersion) {
if (baseVersion->HasSegment(segmentId)) {
continue;
}
auto segDir = root->GetDirectory(validVersion.GetSegmentDirName(segmentId),
/*throwExceptionIfNotExist=*/false);
if (!segDir) {
auto status = Status::IOError("get segment[%d] dir failed", segmentId);
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
auto [status, segLocator] = GetSegmentLocator(segDir->GetIDirectory());
RETURN_IF_STATUS_ERROR(status, "get segment[%d] locator failed", segmentId);
if (!importVersionLocator.IsSameSrc(segLocator, false)) {
// segment src必须和version src保持一致才会被import进去
//这个表明,这个segment是和这个version同源的,需要被import进去
//如果导入和version src不同源的segment,会出现bug,比如:
//全量segment 0和增量segment 1不同源
//全量segment 0和1被merge成2后,如果重复import旧的全量segment 0,依然会被import进来
continue;
}
framework::Locator baseLocator = baseVersion->GetLocator();
if (!baseLocator.IsSameSrc(segLocator, false)) {
auto importStrategy = options.GetImportStrategy();
if (importStrategy == KEEP_SEGMENT_IGNORE_LOCATOR ||
importStrategy == KEEP_SEGMENT_OVERWRITE_LOCATOR) {
baseVersion->AddSegment(segmentId, schemaId);
newSegments.insert(segmentId);
} else {
AUTIL_LOG(ERROR, "segment locator src not same with base version locator src");
return Status::InternalError();
}
} else {
if (baseLocator.IsFasterThan(segLocator, false) !=
framework::Locator::LocatorCompareResult::LCR_FULLY_FASTER) {
baseVersion->AddSegment(segmentId, schemaId);
newSegments.insert(segmentId);
} else {
AUTIL_LOG(INFO, "filter segment, fence:%s segId:%d", validVersion.GetFenceName().c_str(),
segmentId);
}
}
}
otherSegDescs.push_back(validVersion.GetSegmentDescriptions());
std::string versionFileName =
VERSION_FILE_NAME_PREFIX + std::string(".") + std::to_string(validVersion.GetVersionId());
indexlib::file_system::RemoveOption removeOption;
removeOption.logicalDelete = true;
status = fence->GetFileSystem()->RemoveFile(versionFileName, removeOption).Status();
if (!status.IsOK()) {
AUTIL_LOG(ERROR, "logical remove file %s failed", versionFileName.c_str());
return status;
} else {
AUTIL_LOG(INFO, "logical remove file %s", versionFileName.c_str());
}
baseVersion->MergeDescriptions(validVersion);
}
// 4 import Segment Descriptions
auto baseSegDescs = baseVersion->GetSegmentDescriptions();
if (baseSegDescs == nullptr) {
auto status = Status::InternalError("base segment descriptions is nullptr");
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
auto status = baseSegDescs->Import(otherSegDescs, newSegments);
if (!status.IsOK()) {
AUTIL_LOG(ERROR, "segment descriptions import failed");
return status;
}
if (resultMaxTs > baseVersion->GetTimestamp()) {
baseVersion->SetTimestamp(resultMaxTs);
}
// TODO(yonghao.fyh) : consider partial faster
baseVersion->SetLocator(resultLocator);
}
if (!removeVersionRet) {
return Status::IOError();
}
return Status::OK();
}