in cloud/src/meta-service/meta_service.cpp [2433:2773]
void MetaServiceImpl::get_delete_bitmap_update_lock_v2(
google::protobuf::RpcController* controller,
const GetDeleteBitmapUpdateLockRequest* request,
GetDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done) {
VLOG_DEBUG << "get delete bitmap update lock in v2 for table=" << request->table_id()
<< ",lock id=" << request->lock_id() << ",initiator=" << request->initiator();
RPC_PREPROCESS(get_delete_bitmap_update_lock);
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
if (cloud_unique_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "cloud unique id not set";
return;
}
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
return;
}
RPC_RATE_LIMIT(get_delete_bitmap_update_lock)
auto table_id = request->table_id();
std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
bool first_retry = true;
int64_t retry = 0;
while (retry <= 1) {
retry++;
response->Clear();
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to init txn";
return;
}
std::string lock_val;
DeleteBitmapUpdateLockPB lock_info;
err = txn->get(lock_key, &lock_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "failed to get delete bitmap update lock, instance_id=" << instance_id
<< " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err;
msg = ss.str();
code = MetaServiceCode::KV_TXN_GET_ERR;
return;
}
using namespace std::chrono;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
int64_t expiration = now + request->expiration();
bool lock_key_not_found = false;
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
lock_key_not_found = true;
std::string current_lock_msg = "lock key not found";
lock_info.set_lock_id(request->lock_id());
// compaction does not use this expiration, only used when upgrade ms
lock_info.set_expiration(expiration);
if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) {
lock_info.add_initiators(request->initiator());
} else {
// in normal case, this should remove 0 kvs
// but when upgrade ms, if there are ms with old and new versions, it works
std::string tablet_compaction_key_begin =
mow_tablet_compaction_key({instance_id, table_id, 0});
std::string tablet_compaction_key_end =
mow_tablet_compaction_key({instance_id, table_id, INT64_MAX});
txn->remove(tablet_compaction_key_begin, tablet_compaction_key_end);
LOG(INFO) << "remove mow tablet compaction kv, begin="
<< hex(tablet_compaction_key_begin)
<< " end=" << hex(tablet_compaction_key_end) << " table_id=" << table_id;
if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id,
request->lock_id(), request->initiator(),
expiration, current_lock_msg)) {
return;
}
}
if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(),
request->initiator(), lock_key, lock_info,
current_lock_msg)) {
return;
}
} else if (err == TxnErrorCode::TXN_OK) {
if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "failed to parse DeleteBitmapUpdateLockPB";
return;
}
if (lock_info.lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) {
if (lock_info.expiration() > 0 && lock_info.expiration() < now) {
LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id="
<< lock_info.lock_id() << " table_id=" << table_id
<< " expiration=" << lock_info.expiration() << " now=" << now
<< " initiator_size=" << lock_info.initiators_size();
lock_info.clear_initiators();
} else if (lock_info.lock_id() != request->lock_id()) {
ss << "already be locked by lock_id=" << lock_info.lock_id()
<< " expiration=" << lock_info.expiration() << " now=" << now
<< ", request lock_id=" << request->lock_id() << " table_id=" << table_id
<< " initiator=" << request->initiator();
msg = ss.str();
code = MetaServiceCode::LOCK_CONFLICT;
return;
}
std::string current_lock_msg =
"original lock_id=" + std::to_string(lock_info.lock_id());
lock_info.set_lock_id(request->lock_id());
// compaction does not use the expiration, only used when upgrade ms
lock_info.set_expiration(expiration);
if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) {
bool found = false;
for (auto initiator : lock_info.initiators()) {
if (request->initiator() == initiator) {
found = true;
break;
}
}
if (!found) {
lock_info.add_initiators(request->initiator());
}
} else {
lock_key_not_found = true;
// in normal case, this should remove 0 kvs
// but when upgrade ms, if there are ms with old and new versions, it works
std::string tablet_compaction_key_begin =
mow_tablet_compaction_key({instance_id, table_id, 0});
std::string tablet_compaction_key_end =
mow_tablet_compaction_key({instance_id, table_id, INT64_MAX});
txn->remove(tablet_compaction_key_begin, tablet_compaction_key_end);
LOG(INFO) << "remove mow tablet compaction kv, begin="
<< hex(tablet_compaction_key_begin)
<< " end=" << hex(tablet_compaction_key_end)
<< " table_id=" << table_id;
if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id,
request->lock_id(), request->initiator(),
expiration, current_lock_msg)) {
return;
}
}
if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(),
request->initiator(), lock_key, lock_info,
current_lock_msg)) {
return;
}
} else {
if (request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID) {
std::string current_lock_msg = "locked by lock_id=-1";
if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id,
request->lock_id(), request->initiator(),
expiration, current_lock_msg)) {
return;
}
} else {
// check if compaction key is expired
bool has_unexpired_compaction = false;
int64_t unexpired_expiration = 0;
std::string key0 = mow_tablet_compaction_key({instance_id, table_id, 0});
std::string key1 = mow_tablet_compaction_key({instance_id, table_id + 1, 0});
MowTabletCompactionPB mow_tablet_compaction;
std::unique_ptr<RangeGetIterator> it;
int64_t expired_compaction_num = 0;
do {
err = txn->get(key0, key1, &it);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "internal error, failed to get mow tablet compaction, err="
<< err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
while (it->has_next() && !has_unexpired_compaction) {
auto [k, v] = it->next();
if (!mow_tablet_compaction.ParseFromArray(v.data(), v.size()))
[[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "failed to parse MowTabletCompactionPB";
return;
}
if (mow_tablet_compaction.expiration() > 0 &&
mow_tablet_compaction.expiration() < now) {
LOG(INFO) << "remove mow tablet compaction lock. table_id="
<< table_id << " lock_id=" << lock_info.lock_id()
<< " expiration=" << mow_tablet_compaction.expiration()
<< " now=" << now << " key=" << hex(k);
txn->remove(k);
expired_compaction_num++;
} else {
has_unexpired_compaction = true;
unexpired_expiration = mow_tablet_compaction.expiration();
}
}
key0 = it->next_begin_key(); // Update to next smallest key for iteration
} while (it->more() && !has_unexpired_compaction);
if (has_unexpired_compaction) {
// TODO print initiator
ss << "already be locked by lock_id=" << lock_info.lock_id()
<< " expiration=" << unexpired_expiration << " now=" << now
<< ". request lock_id=" << request->lock_id() << " table_id=" << table_id
<< " initiator=" << request->initiator();
msg = ss.str();
code = MetaServiceCode::LOCK_CONFLICT;
return;
}
// all compaction is expired
lock_info.set_lock_id(request->lock_id());
lock_info.set_expiration(expiration);
lock_info.clear_initiators();
lock_info.add_initiators(request->initiator());
std::string current_lock_msg =
std::to_string(expired_compaction_num) + " compaction is expired";
if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id,
request->lock_id(), request->initiator(),
lock_key, lock_info, current_lock_msg)) {
return;
}
}
}
}
err = txn->commit();
TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock:commit:conflict", &first_retry,
&err);
if (err == TxnErrorCode::TXN_OK) {
break;
} else if (err == TxnErrorCode::TXN_CONFLICT && lock_key_not_found &&
request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID &&
config::delete_bitmap_enable_retry_txn_conflict && first_retry) {
// if err is TXN_CONFLICT, and the lock id is -1, do a fast retry
LOG(INFO) << "fast retry to get_delete_bitmap_update_lock, tablet_id="
<< request->table_id() << " lock_id=" << request->lock_id()
<< ", initiator=" << request->initiator() << ", err=" << err;
first_retry = false;
continue;
} else {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to get_delete_bitmap_update_lock, lock_id=" << request->lock_id()
<< ", initiator=" << request->initiator() << ", err=" << err;
msg = ss.str();
return;
}
}
bool require_tablet_stats =
request->has_require_compaction_stats() ? request->require_compaction_stats() : false;
if (!require_tablet_stats) return;
// this request is from fe when it commits txn for MOW table, we send the compaction stats
// along with the GetDeleteBitmapUpdateLockResponse which will be sent to BE later to let
// BE eliminate unnecessary sync_rowsets() calls if possible
// 1. hold the delete bitmap update lock in MS(update lock_info.lock_id to current load's txn id)
// 2. read tablets' stats
// 3. check whether we still hold the delete bitmap update lock
// these steps can be done in different fdb txns
StopWatch read_stats_sw;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to init txn";
return;
}
for (const auto& tablet_idx : request->tablet_indexes()) {
// 1. get compaction cnts
TabletStatsPB tablet_stat;
std::string stats_key =
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string stats_val;
err = txn->get(stats_key, &stats_val);
TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock.get_compaction_cnts_inject_error",
&err);
if (err == TxnErrorCode::TXN_TOO_OLD) {
code = MetaServiceCode::OK;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to init txn when get tablet stats";
msg = ss.str();
return;
}
err = txn->get(stats_key, &stats_val);
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
tablet_idx.tablet_id());
return;
}
if (!tablet_stat.ParseFromArray(stats_val.data(), stats_val.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("marformed tablet stats value, key={}", hex(stats_key));
return;
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());
// 2. get tablet states
std::string tablet_meta_key =
meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string tablet_meta_val;
err = txn->get(tablet_meta_key, &tablet_meta_val);
if (err != TxnErrorCode::TXN_OK) {
ss << "failed to get tablet meta"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id()
<< " key=" << hex(tablet_meta_key) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return;
}
doris::TabletMetaCloudPB tablet_meta;
if (!tablet_meta.ParseFromString(tablet_meta_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed tablet meta";
return;
}
response->add_tablet_states(
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
}
read_stats_sw.pause();
LOG(INFO) << fmt::format(
"tablet_idxes.size()={}, read tablet compaction cnts and tablet states cost={} ms",
request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000);
DeleteBitmapUpdateLockPB lock_info_tmp;
if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(),
request->initiator(), lock_key, lock_info_tmp, "v2")) {
LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats and tablet "
"states, table_id="
<< table_id << " request lock_id=" << request->lock_id()
<< " request initiator=" << request->initiator() << " code=" << code
<< " msg=" << msg;
}
}