Status KVTableBulkloadOperation::Execute()

in aios/storage/indexlib/table/kv_table/index_task/KVTableBulkloadOperation.cpp [76:220]


Status KVTableBulkloadOperation::Execute(const framework::IndexTaskContext& context)
{
    Status status;
    std::string bulkloadId;
    std::string externalFilesStr;
    std::string lastSequenceNumberStr;
    std::string importExternalFileOptionsStr;

    if (!_desc.GetParameter(framework::PARAM_BULKLOAD_ID, bulkloadId) ||
        !_desc.GetParameter(framework::PARAM_EXTERNAL_FILES, externalFilesStr) ||
        !_desc.GetParameter(framework::PARAM_LAST_SEQUENCE_NUMBER, lastSequenceNumberStr) ||
        !_desc.GetParameter(framework::PARAM_IMPORT_EXTERNAL_FILE_OPTIONS, importExternalFileOptionsStr)) {
        status = Status::InternalError("failed to get params for bulkload");
        AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
        return status;
    }
    std::vector<std::string> externalFiles;
    framework::ImportExternalFileOptions options;
    status = indexlib::util::JsonUtil::FromString(externalFilesStr, &externalFiles);
    if (!status.IsOK()) {
        AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
        return status;
    }
    status = indexlib::util::JsonUtil::FromString(importExternalFileOptionsStr, &options);
    if (!status.IsOK()) {
        AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
        return status;
    }
    uint64_t lastSequenceNumber = 0;
    if (!autil::StringUtil::fromString(lastSequenceNumberStr, lastSequenceNumber)) {
        status =
            Status::InternalError("convert last sequence number str [%s] to num failed", lastSequenceNumberStr.c_str());
        AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
        return status;
    }
    auto resourceManager = context.GetResourceManager();
    versionid_t versionId;
    if (resourceManager == nullptr) {
        status = Status::InternalError("failed to get resource manager");
    } else if (!_desc.GetParameter(PARAM_TARGET_VERSION_ID, versionId)) {
        status = Status::InternalError("failed to get target version id");
    }
    auto targetVersion = context.GetTabletData()->GetOnDiskVersion().Clone("");
    auto indexTask = targetVersion.GetIndexTaskQueue()->Get(framework::BULKLOAD_TASK_TYPE, bulkloadId);
    if (indexTask == nullptr) {
        status = Status::InternalError("current bulkload task [%s] not found in base version [%d]", bulkloadId.c_str(),
                                       context.GetTabletData()->GetOnDiskVersion().GetVersionId());
    }
    if (!status.IsOK()) {
        AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
        return status;
    }

    std::shared_ptr<indexlib::file_system::IDirectory> opDir;
    std::tie(status, opDir) = context.GetOpFenceRoot(_desc.GetId(), _desc.UseOpFenceDir());
    if (!status.IsOK()) {
        AUTIL_LOG(ERROR, "get op fence root failed opId[%ld], status %s", _desc.GetId(), status.ToString().c_str());
        return status;
    }

    targetVersion.SetVersionId(versionId);
    auto tabletOptions = context.GetTabletOptions();
    if (tabletOptions == nullptr) {
        status = Status::InternalError("invalid bulkload result, internal file meta empty");
        AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
        return status;
    }
    KVTabletOptions kvTabletOptions(std::move(tabletOptions));
    auto levelInfo = targetVersion.GetSegmentDescriptions()->GetLevelInfo();
    if (levelInfo == nullptr) {
        levelInfo = std::make_shared<indexlibv2::framework::LevelInfo>();
        levelInfo->Init(framework::topo_hash_mod, kvTabletOptions.GetLevelNum(), kvTabletOptions.GetShardNum());
        targetVersion.GetSegmentDescriptions()->SetLevelInfo(levelInfo);
    }

    KVTableExternalFileImportJob job(opDir->GetOutputPath(), options, kvTabletOptions.GetShardNum());
    status = job.Prepare(externalFiles);
    if (!status.IsOK()) {
        AUTIL_LOG(ERROR, "failed to prepare external files");
        return status;
    }
    auto internalFiles = job.GetInternalFiles();
    if (internalFiles.empty()) {
        status = Status::InternalError("invalid bulkload result, internal files empty");
        AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
        return status;
    }

    std::shared_ptr<KVTableBulkloadSegmentCreatePlan> bulkloadSegmentCreatePlan;
    status = resourceManager->LoadResource(/*name=*/BULKLOAD_SEGMENT_CREATE_PLAN, /*type=*/BULKLOAD_SEGMENT_CREATE_PLAN,
                                           bulkloadSegmentCreatePlan);
    if (status.IsNotFound()) {
        status = resourceManager->CreateResource(/*name=*/BULKLOAD_SEGMENT_CREATE_PLAN,
                                                 /*type=*/BULKLOAD_SEGMENT_CREATE_PLAN, bulkloadSegmentCreatePlan);
    }
    RETURN_IF_STATUS_ERROR(status, "load or create [%s] failed", BULKLOAD_SEGMENT_CREATE_PLAN);

    std::map<uint64_t, std::vector<InternalFileInfo>> internalFileGroups;
    for (const auto& internalFile : internalFiles) {
        internalFileGroups[internalFile.sequenceNumber].push_back(internalFile);
    }
    segmentid_t currentMaxMergedSegmentId = context.GetMaxMergedSegmentId();
    std::vector<segmentid_t> bulkloadSegIds;
    for (auto& [_, fileGroup] : internalFileGroups) {
        segmentid_t segId = ++currentMaxMergedSegmentId;
        targetVersion.AddSegment(segId);
        bulkloadSegIds.push_back(segId);
        for (auto& file : fileGroup) {
            file.targetSegmentId = segId;
            file.targetLevelIdx = 0;
            bulkloadSegmentCreatePlan->AddInternalFileInfo(file);
        }
    }

    // bulkload reserved a built seg id, and << 24 as sequence number
    // use this seg id to position
    segmentid_t reservedSegmentId = lastSequenceNumber >> 24;
    auto iter = levelInfo->levelMetas[0].segments.begin();
    for (; iter != levelInfo->levelMetas[0].segments.end(); ++iter) {
        if (*iter > reservedSegmentId) {
            break;
        }
    }
    levelInfo->levelMetas[0].segments.insert(iter, bulkloadSegIds.begin(), bulkloadSegIds.end());

    RETURN_IF_STATUS_ERROR(resourceManager->CommitResource(bulkloadSegmentCreatePlan->GetName()),
                           "commit resource failed, name[%s], type[%s].", bulkloadSegmentCreatePlan->GetName().c_str(),
                           bulkloadSegmentCreatePlan->GetType().c_str());

    std::shared_ptr<VersionResource> versionResource;
    status = resourceManager->LoadResource(targetVersion.GetVersionFileName(), VERSION_RESOURCE, versionResource);
    if (status.IsNotFound()) {
        status = resourceManager->CreateResource(targetVersion.GetVersionFileName(), VERSION_RESOURCE, versionResource);
    }
    RETURN_IF_STATUS_ERROR(status, "load or create [%s] failed", VERSION_RESOURCE);
    auto indexTaskQueue = targetVersion.GetIndexTaskQueue();
    if (!indexTaskQueue->Done(framework::BULKLOAD_TASK_TYPE, bulkloadId)) {
        return Status::InternalError();
    }
    versionResource->SetVersion(targetVersion);
    RETURN_IF_STATUS_ERROR(resourceManager->CommitResource(versionResource->GetName()),
                           "commit resource failed, name[%s], type[%s].", versionResource->GetName().c_str(),
                           versionResource->GetType().c_str());
    return status;
}