void MetaServiceImpl::get_delete_bitmap_update_lock_v2()

in cloud/src/meta-service/meta_service.cpp [2433:2773]


void MetaServiceImpl::get_delete_bitmap_update_lock_v2(
        google::protobuf::RpcController* controller,
        const GetDeleteBitmapUpdateLockRequest* request,
        GetDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done) {
    VLOG_DEBUG << "get delete bitmap update lock in v2 for table=" << request->table_id()
               << ",lock id=" << request->lock_id() << ",initiator=" << request->initiator();
    RPC_PREPROCESS(get_delete_bitmap_update_lock);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }

    RPC_RATE_LIMIT(get_delete_bitmap_update_lock)
    auto table_id = request->table_id();
    std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
    bool first_retry = true;
    int64_t retry = 0;
    while (retry <= 1) {
        retry++;
        response->Clear();
        std::unique_ptr<Transaction> txn;
        TxnErrorCode err = txn_kv_->create_txn(&txn);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::CREATE>(err);
            msg = "failed to init txn";
            return;
        }
        std::string lock_val;
        DeleteBitmapUpdateLockPB lock_info;
        err = txn->get(lock_key, &lock_val);
        if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
            ss << "failed to get delete bitmap update lock, instance_id=" << instance_id
               << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err;
            msg = ss.str();
            code = MetaServiceCode::KV_TXN_GET_ERR;
            return;
        }
        using namespace std::chrono;
        int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
        int64_t expiration = now + request->expiration();
        bool lock_key_not_found = false;
        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
            lock_key_not_found = true;
            std::string current_lock_msg = "lock key not found";
            lock_info.set_lock_id(request->lock_id());
            // compaction does not use this expiration, only used when upgrade ms
            lock_info.set_expiration(expiration);
            if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) {
                lock_info.add_initiators(request->initiator());
            } else {
                // in normal case, this should remove 0 kvs
                // but when upgrade ms, if there are ms with old and new versions, it works
                std::string tablet_compaction_key_begin =
                        mow_tablet_compaction_key({instance_id, table_id, 0});
                std::string tablet_compaction_key_end =
                        mow_tablet_compaction_key({instance_id, table_id, INT64_MAX});
                txn->remove(tablet_compaction_key_begin, tablet_compaction_key_end);
                LOG(INFO) << "remove mow tablet compaction kv, begin="
                          << hex(tablet_compaction_key_begin)
                          << " end=" << hex(tablet_compaction_key_end) << " table_id=" << table_id;
                if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id,
                                                   request->lock_id(), request->initiator(),
                                                   expiration, current_lock_msg)) {
                    return;
                }
            }
            if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(),
                                                   request->initiator(), lock_key, lock_info,
                                                   current_lock_msg)) {
                return;
            }
        } else if (err == TxnErrorCode::TXN_OK) {
            if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                msg = "failed to parse DeleteBitmapUpdateLockPB";
                return;
            }
            if (lock_info.lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) {
                if (lock_info.expiration() > 0 && lock_info.expiration() < now) {
                    LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id="
                              << lock_info.lock_id() << " table_id=" << table_id
                              << " expiration=" << lock_info.expiration() << " now=" << now
                              << " initiator_size=" << lock_info.initiators_size();
                    lock_info.clear_initiators();
                } else if (lock_info.lock_id() != request->lock_id()) {
                    ss << "already be locked by lock_id=" << lock_info.lock_id()
                       << " expiration=" << lock_info.expiration() << " now=" << now
                       << ", request lock_id=" << request->lock_id() << " table_id=" << table_id
                       << " initiator=" << request->initiator();
                    msg = ss.str();
                    code = MetaServiceCode::LOCK_CONFLICT;
                    return;
                }
                std::string current_lock_msg =
                        "original lock_id=" + std::to_string(lock_info.lock_id());
                lock_info.set_lock_id(request->lock_id());
                // compaction does not use the expiration, only used when upgrade ms
                lock_info.set_expiration(expiration);
                if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) {
                    bool found = false;
                    for (auto initiator : lock_info.initiators()) {
                        if (request->initiator() == initiator) {
                            found = true;
                            break;
                        }
                    }
                    if (!found) {
                        lock_info.add_initiators(request->initiator());
                    }
                } else {
                    lock_key_not_found = true;
                    // in normal case, this should remove 0 kvs
                    // but when upgrade ms, if there are ms with old and new versions, it works
                    std::string tablet_compaction_key_begin =
                            mow_tablet_compaction_key({instance_id, table_id, 0});
                    std::string tablet_compaction_key_end =
                            mow_tablet_compaction_key({instance_id, table_id, INT64_MAX});
                    txn->remove(tablet_compaction_key_begin, tablet_compaction_key_end);
                    LOG(INFO) << "remove mow tablet compaction kv, begin="
                              << hex(tablet_compaction_key_begin)
                              << " end=" << hex(tablet_compaction_key_end)
                              << " table_id=" << table_id;
                    if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id,
                                                       request->lock_id(), request->initiator(),
                                                       expiration, current_lock_msg)) {
                        return;
                    }
                }
                if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(),
                                                       request->initiator(), lock_key, lock_info,
                                                       current_lock_msg)) {
                    return;
                }
            } else {
                if (request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID) {
                    std::string current_lock_msg = "locked by lock_id=-1";
                    if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id,
                                                       request->lock_id(), request->initiator(),
                                                       expiration, current_lock_msg)) {
                        return;
                    }
                } else {
                    // check if compaction key is expired
                    bool has_unexpired_compaction = false;
                    int64_t unexpired_expiration = 0;
                    std::string key0 = mow_tablet_compaction_key({instance_id, table_id, 0});
                    std::string key1 = mow_tablet_compaction_key({instance_id, table_id + 1, 0});
                    MowTabletCompactionPB mow_tablet_compaction;
                    std::unique_ptr<RangeGetIterator> it;
                    int64_t expired_compaction_num = 0;
                    do {
                        err = txn->get(key0, key1, &it);
                        if (err != TxnErrorCode::TXN_OK) {
                            code = cast_as<ErrCategory::READ>(err);
                            ss << "internal error, failed to get mow tablet compaction, err="
                               << err;
                            msg = ss.str();
                            LOG(WARNING) << msg;
                            return;
                        }

                        while (it->has_next() && !has_unexpired_compaction) {
                            auto [k, v] = it->next();
                            if (!mow_tablet_compaction.ParseFromArray(v.data(), v.size()))
                                    [[unlikely]] {
                                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                                msg = "failed to parse MowTabletCompactionPB";
                                return;
                            }
                            if (mow_tablet_compaction.expiration() > 0 &&
                                mow_tablet_compaction.expiration() < now) {
                                LOG(INFO) << "remove mow tablet compaction lock. table_id="
                                          << table_id << " lock_id=" << lock_info.lock_id()
                                          << " expiration=" << mow_tablet_compaction.expiration()
                                          << " now=" << now << " key=" << hex(k);
                                txn->remove(k);
                                expired_compaction_num++;
                            } else {
                                has_unexpired_compaction = true;
                                unexpired_expiration = mow_tablet_compaction.expiration();
                            }
                        }
                        key0 = it->next_begin_key(); // Update to next smallest key for iteration
                    } while (it->more() && !has_unexpired_compaction);
                    if (has_unexpired_compaction) {
                        // TODO print initiator
                        ss << "already be locked by lock_id=" << lock_info.lock_id()
                           << " expiration=" << unexpired_expiration << " now=" << now
                           << ". request lock_id=" << request->lock_id() << " table_id=" << table_id
                           << " initiator=" << request->initiator();
                        msg = ss.str();
                        code = MetaServiceCode::LOCK_CONFLICT;
                        return;
                    }
                    // all compaction is expired
                    lock_info.set_lock_id(request->lock_id());
                    lock_info.set_expiration(expiration);
                    lock_info.clear_initiators();
                    lock_info.add_initiators(request->initiator());
                    std::string current_lock_msg =
                            std::to_string(expired_compaction_num) + " compaction is expired";
                    if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id,
                                                           request->lock_id(), request->initiator(),
                                                           lock_key, lock_info, current_lock_msg)) {
                        return;
                    }
                }
            }
        }

        err = txn->commit();
        TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock:commit:conflict", &first_retry,
                                 &err);
        if (err == TxnErrorCode::TXN_OK) {
            break;
        } else if (err == TxnErrorCode::TXN_CONFLICT && lock_key_not_found &&
                   request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID &&
                   config::delete_bitmap_enable_retry_txn_conflict && first_retry) {
            // if err is TXN_CONFLICT, and the lock id is -1, do a fast retry
            LOG(INFO) << "fast retry to get_delete_bitmap_update_lock, tablet_id="
                      << request->table_id() << " lock_id=" << request->lock_id()
                      << ", initiator=" << request->initiator() << ", err=" << err;
            first_retry = false;
            continue;
        } else {
            code = cast_as<ErrCategory::COMMIT>(err);
            ss << "failed to get_delete_bitmap_update_lock, lock_id=" << request->lock_id()
               << ", initiator=" << request->initiator() << ", err=" << err;
            msg = ss.str();
            return;
        }
    }

    bool require_tablet_stats =
            request->has_require_compaction_stats() ? request->require_compaction_stats() : false;
    if (!require_tablet_stats) return;
    // this request is from fe when it commits txn for MOW table, we send the compaction stats
    // along with the GetDeleteBitmapUpdateLockResponse which will be sent to BE later to let
    // BE eliminate unnecessary sync_rowsets() calls if possible

    // 1. hold the delete bitmap update lock in MS(update lock_info.lock_id to current load's txn id)
    // 2. read tablets' stats
    // 3. check whether we still hold the delete bitmap update lock
    // these steps can be done in different fdb txns

    StopWatch read_stats_sw;
    std::unique_ptr<Transaction> txn;
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to init txn";
        return;
    }

    for (const auto& tablet_idx : request->tablet_indexes()) {
        // 1. get compaction cnts
        TabletStatsPB tablet_stat;
        std::string stats_key =
                stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
                                  tablet_idx.partition_id(), tablet_idx.tablet_id()});
        std::string stats_val;
        err = txn->get(stats_key, &stats_val);
        TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock.get_compaction_cnts_inject_error",
                                 &err);
        if (err == TxnErrorCode::TXN_TOO_OLD) {
            code = MetaServiceCode::OK;
            err = txn_kv_->create_txn(&txn);
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::CREATE>(err);
                ss << "failed to init txn when get tablet stats";
                msg = ss.str();
                return;
            }
            err = txn->get(stats_key, &stats_val);
        }
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
                              tablet_idx.tablet_id());
            return;
        }
        if (!tablet_stat.ParseFromArray(stats_val.data(), stats_val.size())) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            msg = fmt::format("marformed tablet stats value, key={}", hex(stats_key));
            return;
        }
        response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
        response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
        response->add_cumulative_points(tablet_stat.cumulative_point());

        // 2. get tablet states
        std::string tablet_meta_key =
                meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
                                 tablet_idx.partition_id(), tablet_idx.tablet_id()});
        std::string tablet_meta_val;
        err = txn->get(tablet_meta_key, &tablet_meta_val);
        if (err != TxnErrorCode::TXN_OK) {
            ss << "failed to get tablet meta"
               << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
               << " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id()
               << " key=" << hex(tablet_meta_key) << " err=" << err;
            msg = ss.str();
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
                                                          : cast_as<ErrCategory::READ>(err);
            return;
        }
        doris::TabletMetaCloudPB tablet_meta;
        if (!tablet_meta.ParseFromString(tablet_meta_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            msg = "malformed tablet meta";
            return;
        }
        response->add_tablet_states(
                static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
    }

    read_stats_sw.pause();
    LOG(INFO) << fmt::format(
            "tablet_idxes.size()={}, read tablet compaction cnts and tablet states cost={} ms",
            request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000);

    DeleteBitmapUpdateLockPB lock_info_tmp;
    if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(),
                                  request->initiator(), lock_key, lock_info_tmp, "v2")) {
        LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats and tablet "
                        "states, table_id="
                     << table_id << " request lock_id=" << request->lock_id()
                     << " request initiator=" << request->initiator() << " code=" << code
                     << " msg=" << msg;
    }
}