aios/storage/indexlib/framework/VersionMerger.cpp (354 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "indexlib/framework/VersionMerger.h"
#include <assert.h>
#include <type_traits>
#include "autil/EnvUtil.h"
#include "autil/Scope.h"
#include "autil/legacy/legacy_jsonizable.h"
#include "future_lite/coro/CoAwait.h"
#include "future_lite/coro/LazyHelper.h"
#include "future_lite/experimental/coroutine.h"
#include "indexlib/config/CustomIndexTaskClassInfo.h"
#include "indexlib/file_system/Directory.h"
#include "indexlib/framework/IMetrics.h"
#include "indexlib/framework/MetricsManager.h"
#include "indexlib/framework/TabletData.h"
#include "indexlib/framework/VersionCoord.h"
#include "indexlib/framework/VersionLoader.h"
#include "indexlib/framework/index_task/CustomIndexTaskFactory.h"
#include "indexlib/framework/index_task/IndexTaskContext.h"
#include "indexlib/framework/index_task/IndexTaskMetrics.h"
#include "indexlib/framework/index_task/IndexTaskPlan.h"
namespace indexlibv2::framework {
AUTIL_LOG_SETUP(indexlib.framework, VersionMerger);
#define TABLET_LOG(level, format, args...) AUTIL_LOG(level, "[%s] [%p] " format, _tabletName.c_str(), this, ##args)
VersionMerger::VersionMerger(std::string tabletName, std::string indexRoot,
std::shared_ptr<ITabletMergeController> controller,
std::unique_ptr<IIndexTaskPlanCreator> planCreator, MetricsManager* manager,
bool isOnline)
: _tabletName(std::move(tabletName))
, _indexRoot(std::move(indexRoot))
, _controller(std::move(controller))
, _planCreator(std::move(planCreator))
, _lastProposedVersionId(INVALID_VERSIONID)
, _recovered(false)
, _stopped(false)
, _isOnline(isOnline)
{
RegisterMetrics(manager);
_skipCleanTask = autil::EnvUtil::getEnv<size_t>("tablet_merge_skip_clean_task", /*defaultValue*/ 0);
}
void VersionMerger::UpdateVersion(const Version& version)
{
std::lock_guard<std::mutex> lock(_dataMutex);
_currentBaseVersion = version;
TABLET_LOG(INFO, "version [%d] updated", _currentBaseVersion.GetVersionId());
}
void VersionMerger::UpdateCommittedVersionLocator(const Locator& locator)
{
std::lock_guard<std::mutex> lock(_dataMutex);
_committedVersionLocator = locator;
TABLET_LOG(INFO, "Committed Version Locator [%s] updated", _committedVersionLocator.DebugString().c_str());
}
versionid_t VersionMerger::GetBaseVersion() const
{
std::lock_guard<std::mutex> lock(_dataMutex);
if (_mergedVersionInfo && (_mergedVersionInfo->committedVersionId == INVALID_VERSIONID ||
_currentBaseVersion.GetVersionId() < _mergedVersionInfo->committedVersionId)) {
TABLET_LOG(INFO,
"version [%d] would not propose, current merged base version[%d], target version[%d], committed "
"version[%d]",
_currentBaseVersion.GetVersionId(), _mergedVersionInfo->baseVersion.GetVersionId(),
_mergedVersionInfo->targetVersion.GetVersionId(), _mergedVersionInfo->committedVersionId);
return INVALID_VERSIONID;
}
return _currentBaseVersion.GetVersionId();
}
future_lite::coro::Lazy<Status> VersionMerger::SubmitTask(IndexTaskContext* context)
{
// TODO: delete when use taskMeta to specify designate task
auto tabletData = context->GetTabletData();
if (tabletData) {
const auto& onDiskVersion = tabletData->GetOnDiskVersion();
if (onDiskVersion.GetSchemaId() != onDiskVersion.GetReadSchemaId()) {
context->SetDesignateTask("alter_table", "alter_table");
}
}
///////////////////////
std::unique_ptr<IIndexTaskPlanCreator> customPlanCreator;
auto designateTaskConfig = context->GetDesignateTaskConfig();
if (designateTaskConfig) {
auto [status, planCreator] = CustomIndexTaskFactory::GetCustomPlanCreator(designateTaskConfig);
if (!status.IsOK()) {
co_return status;
}
customPlanCreator = std::move(planCreator);
}
auto [status, plan] =
customPlanCreator ? customPlanCreator->CreateTaskPlan(context) : _planCreator->CreateTaskPlan(context);
if (!status.IsOK()) {
TABLET_LOG(ERROR, "create task plan failed: %s", status.ToString().c_str());
co_return status;
}
if (!plan) {
TABLET_LOG(INFO, "empty plan, do not merge");
co_return Status::OK();
}
if (_taskMetrics) {
_taskMetrics->SetCurrentTask(plan->GetTaskType(), plan->GetTaskName());
}
co_return co_await _controller->SubmitMergeTask(std::move(plan), context);
}
void VersionMerger::WaitStop()
{
_controller->Stop();
future_lite::coro::syncAwait([this]() -> future_lite::coro::Lazy<> {
co_await _runMutex.coLock();
_stopped = true;
_runMutex.unlock();
co_return;
}());
}
void VersionMerger::FinishTask(versionid_t baseVersionId, bool removeTempFiles)
{
TABLET_LOG(INFO, "version [%d] proposed", baseVersionId);
if (baseVersionId != indexlibv2::INVALID_VERSIONID) {
_lastProposedVersionId = baseVersionId;
}
if (_skipCleanTask) {
TABLET_LOG(INFO, "clean task skipped");
removeTempFiles = false;
}
if (_taskMetrics) {
_taskMetrics->FinishCurrentTask();
}
[[maybe_unused]] auto status = _controller->CleanTask(removeTempFiles);
}
Status VersionMerger::FillMergedVersionInfo(const MergeTaskStatus& mergeTaskStatus)
{
assert(mergeTaskStatus.code == MergeTaskStatus::DONE);
auto info = std::make_shared<MergedVersionInfo>();
info->baseVersion = mergeTaskStatus.baseVersion;
info->targetVersion = mergeTaskStatus.targetVersion;
TABLET_LOG(INFO, "finish merge result: base version[%d], target version[%d]", info->baseVersion.GetVersionId(),
info->targetVersion.GetVersionId());
{
std::lock_guard<std::mutex> lock(_dataMutex);
_mergedVersionInfo = std::move(info);
}
return Status::OK();
}
std::pair<Status, Version> VersionMerger::LoadVersion(versionid_t versionId) const
{
if (versionId == INVALID_VERSIONID) {
return {Status::InternalError("invalid merged version"), Version()};
}
auto versionRootDir = indexlib::file_system::Directory::GetPhysicalDirectory(_indexRoot);
Version version;
auto status = VersionLoader::GetVersion(versionRootDir, versionId, &version);
if (!status.IsOK()) {
TABLET_LOG(ERROR, "load version [%d] from [%s] failed: %s", versionId, _indexRoot.c_str(),
status.ToString().c_str());
return {status, std::move(version)};
}
return {Status::OK(), std::move(version)};
}
std::shared_ptr<VersionMerger::MergedVersionInfo> VersionMerger::GetMergedVersionInfo()
{
std::lock_guard<std::mutex> lock(_dataMutex);
return _mergedVersionInfo;
}
const std::shared_ptr<VersionMerger::MergedVersionInfo>& VersionMerger::GetMergedVersionInfo() const
{
std::lock_guard<std::mutex> lock(_dataMutex);
return _mergedVersionInfo;
}
int64_t VersionMerger::GetCommittedVersionTimestamp() const
{
std::lock_guard<std::mutex> lock(_dataMutex);
return _committedVersionLocator.GetOffset().first;
}
bool VersionMerger::NeedCommit() const
{
std::lock_guard<std::mutex> lock(_dataMutex);
return _mergedVersionInfo && _mergedVersionInfo->committedVersionId == INVALID_VERSIONID;
}
future_lite::coro::Lazy<std::pair<Status, versionid_t>>
VersionMerger::ExecuteTask(const Version& sourceVersion, const std::string& taskType, const std::string& taskName,
const std::map<std::string, std::string>& params)
{
co_return co_await InnerExecuteTask(sourceVersion, taskType, taskName, params);
}
future_lite::coro::Lazy<std::pair<Status, versionid_t>> VersionMerger::Run()
{
std::string taskType;
std::string taskName;
IndexTaskContext::Parameters params;
co_return co_await InnerExecuteTask(_currentBaseVersion, taskType, taskName, params);
}
future_lite::coro::Lazy<Status> VersionMerger::EnsureRecovered()
{
if (!_recovered) {
auto status = co_await _controller->Recover();
if (!status.IsOK()) {
TABLET_LOG(ERROR, "recover merge controller failed: %s", status.ToString().c_str());
co_return Status::InternalError();
}
auto runningTask = _controller->GetRunningTaskStat();
if (runningTask) {
assert(runningTask->baseVersionId != INVALID_VERSIONID);
auto [versionStat, baseVersion] = LoadVersion(runningTask->baseVersionId);
if (!versionStat.IsOK()) {
TABLET_LOG(ERROR, "load base version [%d] failed", runningTask->baseVersionId);
co_return versionStat;
}
VersionCoord headVersionCoord(baseVersion.GetVersionId(), baseVersion.GetFenceName());
TABLET_LOG(INFO, "recovered version [%d, %s], current version line [%s]", baseVersion.GetVersionId(),
baseVersion.GetFenceName().c_str(),
autil::legacy::ToJsonString(_currentBaseVersion.GetVersionLine(), true).c_str());
TABLET_LOG(INFO, "begin check fast forward, isOnlien[%d]", _isOnline);
if (_isOnline && !_currentBaseVersion.CanFastFowardFrom(headVersionCoord, /*hasBuildingSegment*/ false)) {
TABLET_LOG(INFO, "running version [%d, %s] can't fastfowd to current version [%d], cancel it",
baseVersion.GetVersionId(), baseVersion.GetFenceName().c_str(),
_currentBaseVersion.GetVersionId());
auto cancelStat = co_await _controller->CancelCurrentTask();
if (!cancelStat.IsOK()) {
TABLET_LOG(ERROR, "cancel current task failed");
co_return cancelStat;
}
}
}
_recovered = true;
}
co_return Status::OK();
}
future_lite::coro::Lazy<std::pair<Status, versionid_t>>
VersionMerger::InnerExecuteTask(const Version& sourceVersion, const std::string& taskType, const std::string& taskName,
const std::map<std::string, std::string>& params)
{
auto sourceVersionId = sourceVersion.GetVersionId();
if (sourceVersionId != INVALID_VERSIONID && sourceVersion.GetVersionId() != _currentBaseVersion.GetVersionId()) {
UpdateVersion(sourceVersion);
}
if (!_runMutex.tryLock()) {
TABLET_LOG(WARN, "version merger is running or stopping");
co_return std::make_pair(Status::InternalError(), INVALID_VERSIONID);
}
if (_stopped.load()) {
TABLET_LOG(ERROR, "version merger is stopped");
co_return std::make_pair(Status::InternalError(), INVALID_VERSIONID);
}
autil::ScopeGuard guard([this]() { _runMutex.unlock(); });
auto recoverStatus = co_await EnsureRecovered();
if (!recoverStatus.IsOK()) {
TABLET_LOG(ERROR, "recover failed");
co_return std::make_pair(recoverStatus, INVALID_VERSIONID);
}
std::string taskTraceId;
if (!_controller->GetRunningTaskStat()) {
versionid_t currentVersionId = GetBaseVersion();
if (currentVersionId == INVALID_VERSIONID) {
TABLET_LOG(ERROR, "currentVersionId is invalid version");
co_return std::make_pair(Status::InternalError(), INVALID_VERSIONID);
}
if (currentVersionId != _lastProposedVersionId) {
TABLET_LOG(INFO, "version merger run for version [%d]", currentVersionId);
auto context = _controller->CreateTaskContext(currentVersionId, taskType, taskName, taskTraceId, params);
if (!context) {
TABLET_LOG(ERROR, "create context failed");
co_return std::make_pair(Status::InternalError("create context failed"), INVALID_VERSIONID);
}
auto status = co_await SubmitTask(context.get());
if (!status.IsOK()) {
TABLET_LOG(ERROR, "run merge task for version [%d] failed: %s", currentVersionId,
status.ToString().c_str());
FinishTask(INVALID_VERSIONID, /*removeTempFiles*/ false);
co_return std::make_pair(status, INVALID_VERSIONID);
}
if (!_controller->GetRunningTaskStat()) {
// empty plan
TABLET_LOG(INFO, "no need merge for version [%d]", currentVersionId);
FinishTask(currentVersionId, /*removeTempFiles*/ true);
co_return std::make_pair(Status::OK(), INVALID_VERSIONID);
}
} else {
// latest version already proposed.
co_return std::make_pair(Status::OK(), INVALID_VERSIONID);
}
}
auto [status, mergeTaskStatus] = co_await _controller->WaitMergeResult();
if (!status.IsOK()) {
TABLET_LOG(ERROR, "wait merge result failed: %s", status.ToString().c_str());
co_return std::make_pair(Status::InternalError(), INVALID_VERSIONID);
}
auto baseVersionId = mergeTaskStatus.baseVersion.GetVersionId();
auto targetVersionId = mergeTaskStatus.targetVersion.GetVersionId();
if (mergeTaskStatus.code == MergeTaskStatus::ERROR) {
TABLET_LOG(ERROR, "merge error for version [%d]", baseVersionId);
FinishTask(INVALID_VERSIONID, /*removeTempFiles*/ false);
co_return std::make_pair(Status::InternalError(), INVALID_VERSIONID);
} else {
auto status = FillMergedVersionInfo(mergeTaskStatus);
if (status.IsOK()) {
FinishTask(baseVersionId, /*removeTempFiles=*/true);
co_return std::make_pair(status, targetVersionId);
} else if (status.IsIOError()) {
co_return std::make_pair(status, INVALID_VERSIONID);
} else {
FinishTask(INVALID_VERSIONID, /*removeTempFiles=*/false);
co_return std::make_pair(status, INVALID_VERSIONID);
}
}
}
std::optional<ITabletMergeController::TaskStat> VersionMerger::GetRunningTaskStat() const
{
return _controller->GetRunningTaskStat();
}
void VersionMerger::RegisterMetrics(MetricsManager* manager)
{
if (!manager) {
return;
}
_taskMetrics = std::dynamic_pointer_cast<IndexTaskMetrics>(
manager->CreateMetrics("INDEX_TASK", [&]() -> std::shared_ptr<framework::IMetrics> {
return std::make_shared<IndexTaskMetrics>(_tabletName, manager->GetMetricsReporter());
}));
assert(_taskMetrics);
}
void VersionMerger::UpdateMetrics(TabletData* tabletData)
{
if (!_taskMetrics) {
return;
}
auto lastMergeInfo = GetMergedVersionInfo();
if (lastMergeInfo) {
_taskMetrics->SetmergeBaseVersionIdValue(lastMergeInfo->baseVersion.GetVersionId());
_taskMetrics->SetmergeTargetVersionIdValue(lastMergeInfo->targetVersion.GetVersionId());
_taskMetrics->SetmergeCommittedVersionIdValue(lastMergeInfo->committedVersionId);
auto versionTs = GetCommittedVersionTimestamp();
if (versionTs > 0) {
int64_t versionDelay = autil::TimeUtility::currentTimeInSeconds() - versionTs / 1000000;
_taskMetrics->SetmergeCommittedVersionDelayValue(versionDelay);
}
}
auto runningTaskStat = GetRunningTaskStat();
if (runningTaskStat) {
_taskMetrics->SetrunningMergeBaseVersionIdValue(runningTaskStat.value().baseVersionId);
_taskMetrics->SetrunningMergeLeftOpsValue(runningTaskStat.value().totalOpCount -
runningTaskStat.value().finishedOpCount);
_taskMetrics->SetrunningMergeTotalOpsValue(runningTaskStat.value().totalOpCount);
} else {
_taskMetrics->SetrunningMergeBaseVersionIdValue(INVALID_VERSIONID);
_taskMetrics->SetrunningMergeLeftOpsValue(0);
_taskMetrics->SetrunningMergeTotalOpsValue(0);
}
if (!tabletData) {
return;
}
const auto& his = tabletData->GetOnDiskVersion().GetIndexTaskHistory();
_taskMetrics->UpdateTaskHistory(his);
}
void VersionMerger::FillMetricsInfo(std::map<std::string, std::string>& infoMap)
{
if (!_taskMetrics) {
return;
}
_taskMetrics->FillMetricsInfo(infoMap);
}
#undef TABLET_LOG
} // namespace indexlibv2::framework