in aios/storage/indexlib/table/kv_table/index_task/KVTableBulkloadOperation.cpp [76:220]
Status KVTableBulkloadOperation::Execute(const framework::IndexTaskContext& context)
{
Status status;
std::string bulkloadId;
std::string externalFilesStr;
std::string lastSequenceNumberStr;
std::string importExternalFileOptionsStr;
if (!_desc.GetParameter(framework::PARAM_BULKLOAD_ID, bulkloadId) ||
!_desc.GetParameter(framework::PARAM_EXTERNAL_FILES, externalFilesStr) ||
!_desc.GetParameter(framework::PARAM_LAST_SEQUENCE_NUMBER, lastSequenceNumberStr) ||
!_desc.GetParameter(framework::PARAM_IMPORT_EXTERNAL_FILE_OPTIONS, importExternalFileOptionsStr)) {
status = Status::InternalError("failed to get params for bulkload");
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
std::vector<std::string> externalFiles;
framework::ImportExternalFileOptions options;
status = indexlib::util::JsonUtil::FromString(externalFilesStr, &externalFiles);
if (!status.IsOK()) {
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
status = indexlib::util::JsonUtil::FromString(importExternalFileOptionsStr, &options);
if (!status.IsOK()) {
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
uint64_t lastSequenceNumber = 0;
if (!autil::StringUtil::fromString(lastSequenceNumberStr, lastSequenceNumber)) {
status =
Status::InternalError("convert last sequence number str [%s] to num failed", lastSequenceNumberStr.c_str());
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
auto resourceManager = context.GetResourceManager();
versionid_t versionId;
if (resourceManager == nullptr) {
status = Status::InternalError("failed to get resource manager");
} else if (!_desc.GetParameter(PARAM_TARGET_VERSION_ID, versionId)) {
status = Status::InternalError("failed to get target version id");
}
auto targetVersion = context.GetTabletData()->GetOnDiskVersion().Clone("");
auto indexTask = targetVersion.GetIndexTaskQueue()->Get(framework::BULKLOAD_TASK_TYPE, bulkloadId);
if (indexTask == nullptr) {
status = Status::InternalError("current bulkload task [%s] not found in base version [%d]", bulkloadId.c_str(),
context.GetTabletData()->GetOnDiskVersion().GetVersionId());
}
if (!status.IsOK()) {
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
std::shared_ptr<indexlib::file_system::IDirectory> opDir;
std::tie(status, opDir) = context.GetOpFenceRoot(_desc.GetId(), _desc.UseOpFenceDir());
if (!status.IsOK()) {
AUTIL_LOG(ERROR, "get op fence root failed opId[%ld], status %s", _desc.GetId(), status.ToString().c_str());
return status;
}
targetVersion.SetVersionId(versionId);
auto tabletOptions = context.GetTabletOptions();
if (tabletOptions == nullptr) {
status = Status::InternalError("invalid bulkload result, internal file meta empty");
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
KVTabletOptions kvTabletOptions(std::move(tabletOptions));
auto levelInfo = targetVersion.GetSegmentDescriptions()->GetLevelInfo();
if (levelInfo == nullptr) {
levelInfo = std::make_shared<indexlibv2::framework::LevelInfo>();
levelInfo->Init(framework::topo_hash_mod, kvTabletOptions.GetLevelNum(), kvTabletOptions.GetShardNum());
targetVersion.GetSegmentDescriptions()->SetLevelInfo(levelInfo);
}
KVTableExternalFileImportJob job(opDir->GetOutputPath(), options, kvTabletOptions.GetShardNum());
status = job.Prepare(externalFiles);
if (!status.IsOK()) {
AUTIL_LOG(ERROR, "failed to prepare external files");
return status;
}
auto internalFiles = job.GetInternalFiles();
if (internalFiles.empty()) {
status = Status::InternalError("invalid bulkload result, internal files empty");
AUTIL_LOG(ERROR, "%s", status.ToString().c_str());
return status;
}
std::shared_ptr<KVTableBulkloadSegmentCreatePlan> bulkloadSegmentCreatePlan;
status = resourceManager->LoadResource(/*name=*/BULKLOAD_SEGMENT_CREATE_PLAN, /*type=*/BULKLOAD_SEGMENT_CREATE_PLAN,
bulkloadSegmentCreatePlan);
if (status.IsNotFound()) {
status = resourceManager->CreateResource(/*name=*/BULKLOAD_SEGMENT_CREATE_PLAN,
/*type=*/BULKLOAD_SEGMENT_CREATE_PLAN, bulkloadSegmentCreatePlan);
}
RETURN_IF_STATUS_ERROR(status, "load or create [%s] failed", BULKLOAD_SEGMENT_CREATE_PLAN);
std::map<uint64_t, std::vector<InternalFileInfo>> internalFileGroups;
for (const auto& internalFile : internalFiles) {
internalFileGroups[internalFile.sequenceNumber].push_back(internalFile);
}
segmentid_t currentMaxMergedSegmentId = context.GetMaxMergedSegmentId();
std::vector<segmentid_t> bulkloadSegIds;
for (auto& [_, fileGroup] : internalFileGroups) {
segmentid_t segId = ++currentMaxMergedSegmentId;
targetVersion.AddSegment(segId);
bulkloadSegIds.push_back(segId);
for (auto& file : fileGroup) {
file.targetSegmentId = segId;
file.targetLevelIdx = 0;
bulkloadSegmentCreatePlan->AddInternalFileInfo(file);
}
}
// bulkload reserved a built seg id, and << 24 as sequence number
// use this seg id to position
segmentid_t reservedSegmentId = lastSequenceNumber >> 24;
auto iter = levelInfo->levelMetas[0].segments.begin();
for (; iter != levelInfo->levelMetas[0].segments.end(); ++iter) {
if (*iter > reservedSegmentId) {
break;
}
}
levelInfo->levelMetas[0].segments.insert(iter, bulkloadSegIds.begin(), bulkloadSegIds.end());
RETURN_IF_STATUS_ERROR(resourceManager->CommitResource(bulkloadSegmentCreatePlan->GetName()),
"commit resource failed, name[%s], type[%s].", bulkloadSegmentCreatePlan->GetName().c_str(),
bulkloadSegmentCreatePlan->GetType().c_str());
std::shared_ptr<VersionResource> versionResource;
status = resourceManager->LoadResource(targetVersion.GetVersionFileName(), VERSION_RESOURCE, versionResource);
if (status.IsNotFound()) {
status = resourceManager->CreateResource(targetVersion.GetVersionFileName(), VERSION_RESOURCE, versionResource);
}
RETURN_IF_STATUS_ERROR(status, "load or create [%s] failed", VERSION_RESOURCE);
auto indexTaskQueue = targetVersion.GetIndexTaskQueue();
if (!indexTaskQueue->Done(framework::BULKLOAD_TASK_TYPE, bulkloadId)) {
return Status::InternalError();
}
versionResource->SetVersion(targetVersion);
RETURN_IF_STATUS_ERROR(resourceManager->CommitResource(versionResource->GetName()),
"commit resource failed, name[%s], type[%s].", versionResource->GetName().c_str(),
versionResource->GetType().c_str());
return status;
}