Status CommonVersionImporter::Import()

in aios/storage/indexlib/table/common/CommonVersionImporter.cpp [121:281]


Status CommonVersionImporter::Import(const std::vector<framework::Version>& versions, const framework::Fence* fence,
                                     const framework::ImportOptions& options, framework::Version* baseVersion)
{
    if (fence == nullptr) {
        auto status = Status::InvalidArgs("Fence Description is nullptr!");
        AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
        return status;
    }

    // 1 check input versions
    std::vector<framework::Version> validVersions;
    auto status = Check(versions, baseVersion, options, &validVersions);
    RETURN_IF_STATUS_ERROR(status, "version import check failed");

    // 2 calculate new locator
    framework::Locator resultLocator = baseVersion->GetLocator();
    uint32_t formatVersion = 0;
    int64_t resultMaxTs = 0;
    for (const auto& version : validVersions) {
        auto versionLocator = version.GetLocator();
        if (!resultLocator.IsSameSrc(versionLocator, false)) {
            if (options.GetImportStrategy() == KEEP_SEGMENT_OVERWRITE_LOCATOR) {
                resultLocator = versionLocator;
            }
        } else {
            if (!resultLocator.Update(version.GetLocator())) {
                AUTIL_LOG(ERROR,
                          "locator update failed when import version[%d] with locator[%s]."
                          "  base locator is [%s]. current result locator is [%s]",
                          version.GetVersionId(), version.GetLocator().DebugString().c_str(),
                          baseVersion->GetLocator().DebugString().c_str(), resultLocator.DebugString().c_str());
                return Status::InternalError();
            }
        }
        if (version.GetFormatVersion() > formatVersion) {
            formatVersion = version.GetFormatVersion();
            resultMaxTs = version.GetTimestamp();
        } else {
            resultMaxTs = std::max(resultMaxTs, version.GetTimestamp());
        }
    }

    // 3 import new segments
    std::vector<std::shared_ptr<framework::SegmentDescriptions>> otherSegDescs;
    std::set<segmentid_t> newSegments;
    // rm version file in logical fs at the end to avoid conflict
    std::set<versionid_t> mountedVersionIds;
    bool removeVersionRet = true;
    {
        autil::ScopeGuard sg([&mountedVersionIds, &fence, &removeVersionRet]() {
            for (const auto& versionId : mountedVersionIds) {
                std::string versionFileName = VERSION_FILE_NAME_PREFIX + std::string(".") + std::to_string(versionId);
                indexlib::file_system::RemoveOption removeOption;
                removeOption.logicalDelete = true;
                removeOption.mayNonExist = true;
                auto status = fence->GetFileSystem()->RemoveFile(versionFileName, removeOption).Status();
                if (!status.IsOK()) {
                    removeVersionRet = false;
                    AUTIL_LOG(ERROR, "logical remove file %s failed", versionFileName.c_str());
                } else {
                    AUTIL_LOG(INFO, "logical remove file %s", versionFileName.c_str());
                }
            }
        });
        auto root = indexlib::file_system::Directory::Get(fence->GetFileSystem());

        // check if each input segment's locator is ahead of baseVersion's locator
        for (const auto& validVersion : validVersions) {
            const std::string fenceRoot = PathUtil::JoinPath(fence->GetGlobalRoot(), validVersion.GetFenceName());
            indexlib::file_system::MountOption mountOption(indexlib::file_system::FSMT_READ_ONLY);
            mountOption.conflictResolution = indexlib::file_system::ConflictResolution::CHECK_DIFF;
            auto status = fence->GetFileSystem()
                              ->MountVersion(fenceRoot, validVersion.GetVersionId(),
                                             /*logicalPath=*/"", mountOption, nullptr)
                              .Status();
            RETURN_IF_STATUS_ERROR(status, "mount import version[%d] fence[%s] failed", validVersion.GetVersionId(),
                                   validVersion.GetFenceName().c_str());
            mountedVersionIds.insert(validVersion.GetVersionId());
            otherSegDescs.push_back(validVersion.GetSegmentDescriptions());
            auto importVersionLocator = validVersion.GetLocator();
            for (auto [segmentId, schemaId] : validVersion) {
                if (baseVersion->HasSegment(segmentId)) {
                    continue;
                }
                auto segDir = root->GetDirectory(validVersion.GetSegmentDirName(segmentId),
                                                 /*throwExceptionIfNotExist=*/false);
                if (!segDir) {
                    auto status = Status::IOError("get segment[%d] dir failed", segmentId);
                    AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
                    return status;
                }
                auto [status, segLocator] = GetSegmentLocator(segDir->GetIDirectory());
                RETURN_IF_STATUS_ERROR(status, "get segment[%d] locator failed", segmentId);
                if (!importVersionLocator.IsSameSrc(segLocator, false)) {
                    // segment src必须和version src保持一致才会被import进去
                    //这个表明,这个segment是和这个version同源的,需要被import进去
                    //如果导入和version src不同源的segment,会出现bug,比如:
                    //全量segment 0和增量segment 1不同源
                    //全量segment 0和1被merge成2后,如果重复import旧的全量segment 0,依然会被import进来
                    continue;
                }
                framework::Locator baseLocator = baseVersion->GetLocator();
                if (!baseLocator.IsSameSrc(segLocator, false)) {
                    auto importStrategy = options.GetImportStrategy();
                    if (importStrategy == KEEP_SEGMENT_IGNORE_LOCATOR ||
                        importStrategy == KEEP_SEGMENT_OVERWRITE_LOCATOR) {
                        baseVersion->AddSegment(segmentId, schemaId);
                        newSegments.insert(segmentId);
                    } else {
                        AUTIL_LOG(ERROR, "segment locator src not same with base version locator src");
                        return Status::InternalError();
                    }
                } else {
                    if (baseLocator.IsFasterThan(segLocator, false) !=
                        framework::Locator::LocatorCompareResult::LCR_FULLY_FASTER) {
                        baseVersion->AddSegment(segmentId, schemaId);
                        newSegments.insert(segmentId);
                    } else {
                        AUTIL_LOG(INFO, "filter segment, fence:%s segId:%d", validVersion.GetFenceName().c_str(),
                                  segmentId);
                    }
                }
            }
            otherSegDescs.push_back(validVersion.GetSegmentDescriptions());
            std::string versionFileName =
                VERSION_FILE_NAME_PREFIX + std::string(".") + std::to_string(validVersion.GetVersionId());
            indexlib::file_system::RemoveOption removeOption;
            removeOption.logicalDelete = true;
            status = fence->GetFileSystem()->RemoveFile(versionFileName, removeOption).Status();
            if (!status.IsOK()) {
                AUTIL_LOG(ERROR, "logical remove file %s failed", versionFileName.c_str());
                return status;
            } else {
                AUTIL_LOG(INFO, "logical remove file %s", versionFileName.c_str());
            }
            baseVersion->MergeDescriptions(validVersion);
        }

        // 4 import Segment Descriptions
        auto baseSegDescs = baseVersion->GetSegmentDescriptions();
        if (baseSegDescs == nullptr) {
            auto status = Status::InternalError("base segment descriptions is nullptr");
            AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
            return status;
        }
        auto status = baseSegDescs->Import(otherSegDescs, newSegments);
        if (!status.IsOK()) {
            AUTIL_LOG(ERROR, "segment descriptions import failed");
            return status;
        }
        if (resultMaxTs > baseVersion->GetTimestamp()) {
            baseVersion->SetTimestamp(resultMaxTs);
        }
        // TODO(yonghao.fyh) : consider partial faster
        baseVersion->SetLocator(resultLocator);
    }
    if (!removeVersionRet) {
        return Status::IOError();
    }
    return Status::OK();
}