Status DataDir::load()

in be/src/olap/data_dir.cpp [345:668]


Status DataDir::load() {
    LOG(INFO) << "start to load tablets from " << _path;

    // load rowset meta from meta env and create rowset
    // COMMITTED: add to txn manager
    // VISIBLE: add to tablet
    // if one rowset load failed, then the total data dir will not be loaded

    // necessarily check incompatible old format. when there are old metas, it may load to data missing
    RETURN_IF_ERROR(_check_incompatible_old_format_tablet());

    std::vector<RowsetMetaSharedPtr> dir_rowset_metas;
    LOG(INFO) << "begin loading rowset from meta";
    auto load_rowset_func = [&dir_rowset_metas, this](TabletUid tablet_uid, RowsetId rowset_id,
                                                      std::string_view meta_str) -> bool {
        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
        bool parsed = rowset_meta->init(meta_str);
        if (!parsed) {
            LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id;
            // return false will break meta iterator, return true to skip this error
            return true;
        }

        if (rowset_meta->has_delete_predicate()) {
            // copy the delete sub pred v1 to check then
            auto orig_delete_sub_pred = rowset_meta->delete_predicate().sub_predicates();
            auto* delete_pred = rowset_meta->mutable_delete_pred_pb();

            if ((!delete_pred->sub_predicates().empty() &&
                 delete_pred->sub_predicates_v2().empty()) ||
                (!delete_pred->in_predicates().empty() &&
                 delete_pred->in_predicates()[0].has_column_unique_id())) {
                // convert pred and write only when delete sub pred v2 is not set or there is in list pred to be set column uid
                RETURN_IF_ERROR(DeleteHandler::convert_to_sub_pred_v2(
                        delete_pred, rowset_meta->tablet_schema()));
                LOG(INFO) << fmt::format(
                        "convert rowset with old delete pred: rowset_id={}, tablet_id={}",
                        rowset_id.to_string(), tablet_uid.to_string());
                CHECK_EQ(orig_delete_sub_pred.size(), delete_pred->sub_predicates().size())
                        << "inconsistent sub predicate v1 after conversion";
                for (size_t i = 0; i < orig_delete_sub_pred.size(); ++i) {
                    CHECK_STREQ(orig_delete_sub_pred.Get(i).c_str(),
                                delete_pred->sub_predicates().Get(i).c_str())
                            << "inconsistent sub predicate v1 after conversion";
                }
                std::string result;
                rowset_meta->serialize(&result);
                std::string key =
                        ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
                RETURN_IF_ERROR(_meta->put(META_COLUMN_FAMILY_INDEX, key, result));
            }
        }

        if (rowset_meta->partition_id() == 0) {
            LOG(WARNING) << "rs tablet=" << rowset_meta->tablet_id() << " rowset_id=" << rowset_id
                         << " load from meta but partition id eq 0";
        }

        dir_rowset_metas.push_back(rowset_meta);
        return true;
    };
    MonotonicStopWatch rs_timer;
    rs_timer.start();
    Status load_rowset_status = RowsetMetaManager::traverse_rowset_metas(_meta, load_rowset_func);
    rs_timer.stop();
    if (!load_rowset_status) {
        LOG(WARNING) << "errors when load rowset meta from meta env, skip this data dir:" << _path;
    } else {
        LOG(INFO) << "load rowset from meta finished, cost: "
                  << rs_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
    }

    // load tablet
    // create tablet from tablet meta and add it to tablet mgr
    LOG(INFO) << "begin loading tablet from meta";
    std::set<int64_t> tablet_ids;
    std::set<int64_t> failed_tablet_ids;
    auto load_tablet_func = [this, &tablet_ids, &failed_tablet_ids](
                                    int64_t tablet_id, int32_t schema_hash,
                                    std::string_view value) -> bool {
        Status status = _engine.tablet_manager()->load_tablet_from_meta(
                this, tablet_id, schema_hash, value, false, false, false, false);
        if (!status.ok() && !status.is<TABLE_ALREADY_DELETED_ERROR>() &&
            !status.is<ENGINE_INSERT_OLD_TABLET>()) {
            // load_tablet_from_meta() may return Status::Error<TABLE_ALREADY_DELETED_ERROR>()
            // which means the tablet status is DELETED
            // This may happen when the tablet was just deleted before the BE restarted,
            // but it has not been cleared from rocksdb. At this time, restarting the BE
            // will read the tablet in the DELETE state from rocksdb. These tablets have been
            // added to the garbage collection queue and will be automatically deleted afterwards.
            // Therefore, we believe that this situation is not a failure.

            // Besides, load_tablet_from_meta() may return Status::Error<ENGINE_INSERT_OLD_TABLET>()
            // when BE is restarting and the older tablet have been added to the
            // garbage collection queue but not deleted yet.
            // In this case, since the data_dirs are parallel loaded, a later loaded tablet
            // may be older than previously loaded one, which should not be acknowledged as a
            // failure.
            LOG(WARNING) << "load tablet from header failed. status:" << status
                         << ", tablet=" << tablet_id << "." << schema_hash;
            failed_tablet_ids.insert(tablet_id);
        } else {
            tablet_ids.insert(tablet_id);
        }
        return true;
    };
    MonotonicStopWatch tablet_timer;
    tablet_timer.start();
    Status load_tablet_status = TabletMetaManager::traverse_headers(_meta, load_tablet_func);
    tablet_timer.stop();
    if (!failed_tablet_ids.empty()) {
        LOG(WARNING) << "load tablets from header failed"
                     << ", loaded tablet: " << tablet_ids.size()
                     << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
        if (!config::ignore_load_tablet_failure) {
            throw Exception(Status::FatalError(
                    "load tablets encounter failure. stop BE process. path: {}", _path));
        }
    }
    if (!load_tablet_status) {
        LOG(WARNING) << "there is failure when loading tablet headers"
                     << ", loaded tablet: " << tablet_ids.size()
                     << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
    } else {
        LOG(INFO) << "load tablet from meta finished"
                  << ", loaded tablet: " << tablet_ids.size()
                  << ", error tablet: " << failed_tablet_ids.size()
                  << ", cost: " << tablet_timer.elapsed_time_milliseconds()
                  << " ms, path: " << _path;
    }

    for (int64_t tablet_id : tablet_ids) {
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
        if (tablet && tablet->set_tablet_schema_into_rowset_meta()) {
            RETURN_IF_ERROR(TabletMetaManager::save(this, tablet->tablet_id(),
                                                    tablet->schema_hash(), tablet->tablet_meta()));
        }
    }

    auto load_pending_publish_info_func =
            [&engine = _engine](int64_t tablet_id, int64_t publish_version, std::string_view info) {
                PendingPublishInfoPB pending_publish_info_pb;
                bool parsed = pending_publish_info_pb.ParseFromArray(info.data(), info.size());
                if (!parsed) {
                    LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
                                 << " publish_version: " << publish_version;
                }
                engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
                                              publish_version,
                                              pending_publish_info_pb.transaction_id(), true);
                return true;
            };
    MonotonicStopWatch pending_publish_timer;
    pending_publish_timer.start();
    RETURN_IF_ERROR(
            TabletMetaManager::traverse_pending_publish(_meta, load_pending_publish_info_func));
    pending_publish_timer.stop();
    LOG(INFO) << "load pending publish task from meta finished, cost: "
              << pending_publish_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;

    int64_t rowset_partition_id_eq_0_num = 0;
    for (auto rowset_meta : dir_rowset_metas) {
        if (rowset_meta->partition_id() == 0) {
            ++rowset_partition_id_eq_0_num;
        }
    }
    if (rowset_partition_id_eq_0_num > config::ignore_invalid_partition_id_rowset_num) {
        throw Exception(Status::FatalError(
                "rowset partition id eq 0 is {} bigger than config {}, be exit, plz check be.INFO",
                rowset_partition_id_eq_0_num, config::ignore_invalid_partition_id_rowset_num));
    }

    // traverse rowset
    // 1. add committed rowset to txn map
    // 2. add visible rowset to tablet
    // ignore any errors when load tablet or rowset, because fe will repair them after report
    int64_t invalid_rowset_counter = 0;
    for (auto&& rowset_meta : dir_rowset_metas) {
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(rowset_meta->tablet_id());
        // tablet maybe dropped, but not drop related rowset meta
        if (tablet == nullptr) {
            VLOG_NOTICE << "could not find tablet id: " << rowset_meta->tablet_id()
                        << ", schema hash: " << rowset_meta->tablet_schema_hash()
                        << ", for rowset: " << rowset_meta->rowset_id() << ", skip this rowset";
            ++invalid_rowset_counter;
            continue;
        }

        if (rowset_meta->partition_id() == 0) {
            LOG(WARNING) << "skip tablet_id=" << tablet->tablet_id()
                         << " rowset: " << rowset_meta->rowset_id()
                         << " txn: " << rowset_meta->txn_id();
            continue;
        }

        RowsetSharedPtr rowset;
        Status create_status = tablet->create_rowset(rowset_meta, &rowset);
        if (!create_status) {
            LOG(WARNING) << "could not create rowset from rowsetmeta: "
                         << " rowset_id: " << rowset_meta->rowset_id()
                         << " rowset_type: " << rowset_meta->rowset_type()
                         << " rowset_state: " << rowset_meta->rowset_state();
            continue;
        }
        if (rowset_meta->rowset_state() == RowsetStatePB::COMMITTED &&
            rowset_meta->tablet_uid() == tablet->tablet_uid()) {
            if (!rowset_meta->tablet_schema()) {
                rowset_meta->set_tablet_schema(tablet->tablet_schema());
                RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
                                                        rowset_meta->rowset_id(),
                                                        rowset_meta->get_rowset_pb(), false));
            }
            Status commit_txn_status = _engine.txn_manager()->commit_txn(
                    _meta, rowset_meta->partition_id(), rowset_meta->txn_id(),
                    rowset_meta->tablet_id(), rowset_meta->tablet_uid(), rowset_meta->load_id(),
                    rowset, _engine.pending_local_rowsets().add(rowset_meta->rowset_id()), true);
            if (commit_txn_status || commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
                LOG(INFO) << "successfully to add committed rowset: " << rowset_meta->rowset_id()
                          << " to tablet: " << rowset_meta->tablet_id()
                          << " schema hash: " << rowset_meta->tablet_schema_hash()
                          << " for txn: " << rowset_meta->txn_id();

            } else if (commit_txn_status.is<ErrorCode::INTERNAL_ERROR>()) {
                LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
                             << " to tablet: " << rowset_meta->tablet_id()
                             << " for txn: " << rowset_meta->txn_id()
                             << " error: " << commit_txn_status;
                return commit_txn_status;
            } else {
                LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
                             << " to tablet: " << rowset_meta->tablet_id()
                             << " for txn: " << rowset_meta->txn_id()
                             << " error: " << commit_txn_status;
            }
        } else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
                   rowset_meta->tablet_uid() == tablet->tablet_uid()) {
            if (!rowset_meta->tablet_schema()) {
                rowset_meta->set_tablet_schema(tablet->tablet_schema());
                RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
                                                        rowset_meta->rowset_id(),
                                                        rowset_meta->get_rowset_pb(), false));
            }
            Status publish_status = tablet->add_rowset(rowset);
            if (!publish_status && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
                LOG(WARNING) << "add visible rowset to tablet failed rowset_id:"
                             << rowset->rowset_id() << " tablet id: " << rowset_meta->tablet_id()
                             << " txn id:" << rowset_meta->txn_id()
                             << " start_version: " << rowset_meta->version().first
                             << " end_version: " << rowset_meta->version().second;
            }
        } else {
            LOG(WARNING) << "find invalid rowset: " << rowset_meta->rowset_id()
                         << " with tablet id: " << rowset_meta->tablet_id()
                         << " tablet uid: " << rowset_meta->tablet_uid()
                         << " schema hash: " << rowset_meta->tablet_schema_hash()
                         << " txn: " << rowset_meta->txn_id()
                         << " current valid tablet uid: " << tablet->tablet_uid();
            ++invalid_rowset_counter;
        }
    }

    int64_t dbm_cnt {0};
    int64_t unknown_dbm_cnt {0};
    auto load_delete_bitmap_func = [this, &dbm_cnt, &unknown_dbm_cnt](int64_t tablet_id,
                                                                      int64_t version,
                                                                      std::string_view val) {
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
        if (!tablet) {
            return true;
        }
        const std::vector<RowsetMetaSharedPtr>& all_rowsets = tablet->tablet_meta()->all_rs_metas();
        RowsetIdUnorderedSet rowset_ids;
        for (auto& rowset_meta : all_rowsets) {
            rowset_ids.insert(rowset_meta->rowset_id());
        }

        DeleteBitmapPB delete_bitmap_pb;
        delete_bitmap_pb.ParseFromArray(val.data(), val.size());
        int rst_ids_size = delete_bitmap_pb.rowset_ids_size();
        int seg_ids_size = delete_bitmap_pb.segment_ids_size();
        int seg_maps_size = delete_bitmap_pb.segment_delete_bitmaps_size();
        CHECK(rst_ids_size == seg_ids_size && seg_ids_size == seg_maps_size);

        for (size_t i = 0; i < rst_ids_size; ++i) {
            RowsetId rst_id;
            rst_id.init(delete_bitmap_pb.rowset_ids(i));
            // only process the rowset in _rs_metas
            if (rowset_ids.find(rst_id) == rowset_ids.end()) {
                ++unknown_dbm_cnt;
                continue;
            }
            ++dbm_cnt;
            auto seg_id = delete_bitmap_pb.segment_ids(i);
            auto iter = tablet->tablet_meta()->delete_bitmap().delete_bitmap.find(
                    {rst_id, seg_id, version});
            // This version of delete bitmap already exists
            if (iter != tablet->tablet_meta()->delete_bitmap().delete_bitmap.end()) {
                continue;
            }
            auto bitmap = delete_bitmap_pb.segment_delete_bitmaps(i).data();
            tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id, seg_id, version}] =
                    roaring::Roaring::read(bitmap);
        }
        return true;
    };
    MonotonicStopWatch dbm_timer;
    dbm_timer.start();
    RETURN_IF_ERROR(TabletMetaManager::traverse_delete_bitmap(_meta, load_delete_bitmap_func));
    dbm_timer.stop();

    LOG(INFO) << "load delete bitmap from meta finished, cost: "
              << dbm_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;

    // At startup, we only count these invalid rowset, but do not actually delete it.
    // The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
    // which is cleaned up uniformly by the background cleanup thread.
    LOG(INFO) << "finish to load tablets from " << _path
              << ", total rowset meta: " << dir_rowset_metas.size()
              << ", invalid rowset num: " << invalid_rowset_counter
              << ", visible/stale rowsets' delete bitmap count: " << dbm_cnt
              << ", invalid rowsets' delete bitmap count: " << unknown_dbm_cnt;

    return Status::OK();
}