in be/src/olap/data_dir.cpp [345:668]
Status DataDir::load() {
LOG(INFO) << "start to load tablets from " << _path;
// load rowset meta from meta env and create rowset
// COMMITTED: add to txn manager
// VISIBLE: add to tablet
// if one rowset load failed, then the total data dir will not be loaded
// necessarily check incompatible old format. when there are old metas, it may load to data missing
RETURN_IF_ERROR(_check_incompatible_old_format_tablet());
std::vector<RowsetMetaSharedPtr> dir_rowset_metas;
LOG(INFO) << "begin loading rowset from meta";
auto load_rowset_func = [&dir_rowset_metas, this](TabletUid tablet_uid, RowsetId rowset_id,
std::string_view meta_str) -> bool {
RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
bool parsed = rowset_meta->init(meta_str);
if (!parsed) {
LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id;
// return false will break meta iterator, return true to skip this error
return true;
}
if (rowset_meta->has_delete_predicate()) {
// copy the delete sub pred v1 to check then
auto orig_delete_sub_pred = rowset_meta->delete_predicate().sub_predicates();
auto* delete_pred = rowset_meta->mutable_delete_pred_pb();
if ((!delete_pred->sub_predicates().empty() &&
delete_pred->sub_predicates_v2().empty()) ||
(!delete_pred->in_predicates().empty() &&
delete_pred->in_predicates()[0].has_column_unique_id())) {
// convert pred and write only when delete sub pred v2 is not set or there is in list pred to be set column uid
RETURN_IF_ERROR(DeleteHandler::convert_to_sub_pred_v2(
delete_pred, rowset_meta->tablet_schema()));
LOG(INFO) << fmt::format(
"convert rowset with old delete pred: rowset_id={}, tablet_id={}",
rowset_id.to_string(), tablet_uid.to_string());
CHECK_EQ(orig_delete_sub_pred.size(), delete_pred->sub_predicates().size())
<< "inconsistent sub predicate v1 after conversion";
for (size_t i = 0; i < orig_delete_sub_pred.size(); ++i) {
CHECK_STREQ(orig_delete_sub_pred.Get(i).c_str(),
delete_pred->sub_predicates().Get(i).c_str())
<< "inconsistent sub predicate v1 after conversion";
}
std::string result;
rowset_meta->serialize(&result);
std::string key =
ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
RETURN_IF_ERROR(_meta->put(META_COLUMN_FAMILY_INDEX, key, result));
}
}
if (rowset_meta->partition_id() == 0) {
LOG(WARNING) << "rs tablet=" << rowset_meta->tablet_id() << " rowset_id=" << rowset_id
<< " load from meta but partition id eq 0";
}
dir_rowset_metas.push_back(rowset_meta);
return true;
};
MonotonicStopWatch rs_timer;
rs_timer.start();
Status load_rowset_status = RowsetMetaManager::traverse_rowset_metas(_meta, load_rowset_func);
rs_timer.stop();
if (!load_rowset_status) {
LOG(WARNING) << "errors when load rowset meta from meta env, skip this data dir:" << _path;
} else {
LOG(INFO) << "load rowset from meta finished, cost: "
<< rs_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
}
// load tablet
// create tablet from tablet meta and add it to tablet mgr
LOG(INFO) << "begin loading tablet from meta";
std::set<int64_t> tablet_ids;
std::set<int64_t> failed_tablet_ids;
auto load_tablet_func = [this, &tablet_ids, &failed_tablet_ids](
int64_t tablet_id, int32_t schema_hash,
std::string_view value) -> bool {
Status status = _engine.tablet_manager()->load_tablet_from_meta(
this, tablet_id, schema_hash, value, false, false, false, false);
if (!status.ok() && !status.is<TABLE_ALREADY_DELETED_ERROR>() &&
!status.is<ENGINE_INSERT_OLD_TABLET>()) {
// load_tablet_from_meta() may return Status::Error<TABLE_ALREADY_DELETED_ERROR>()
// which means the tablet status is DELETED
// This may happen when the tablet was just deleted before the BE restarted,
// but it has not been cleared from rocksdb. At this time, restarting the BE
// will read the tablet in the DELETE state from rocksdb. These tablets have been
// added to the garbage collection queue and will be automatically deleted afterwards.
// Therefore, we believe that this situation is not a failure.
// Besides, load_tablet_from_meta() may return Status::Error<ENGINE_INSERT_OLD_TABLET>()
// when BE is restarting and the older tablet have been added to the
// garbage collection queue but not deleted yet.
// In this case, since the data_dirs are parallel loaded, a later loaded tablet
// may be older than previously loaded one, which should not be acknowledged as a
// failure.
LOG(WARNING) << "load tablet from header failed. status:" << status
<< ", tablet=" << tablet_id << "." << schema_hash;
failed_tablet_ids.insert(tablet_id);
} else {
tablet_ids.insert(tablet_id);
}
return true;
};
MonotonicStopWatch tablet_timer;
tablet_timer.start();
Status load_tablet_status = TabletMetaManager::traverse_headers(_meta, load_tablet_func);
tablet_timer.stop();
if (!failed_tablet_ids.empty()) {
LOG(WARNING) << "load tablets from header failed"
<< ", loaded tablet: " << tablet_ids.size()
<< ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
if (!config::ignore_load_tablet_failure) {
throw Exception(Status::FatalError(
"load tablets encounter failure. stop BE process. path: {}", _path));
}
}
if (!load_tablet_status) {
LOG(WARNING) << "there is failure when loading tablet headers"
<< ", loaded tablet: " << tablet_ids.size()
<< ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
} else {
LOG(INFO) << "load tablet from meta finished"
<< ", loaded tablet: " << tablet_ids.size()
<< ", error tablet: " << failed_tablet_ids.size()
<< ", cost: " << tablet_timer.elapsed_time_milliseconds()
<< " ms, path: " << _path;
}
for (int64_t tablet_id : tablet_ids) {
TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
if (tablet && tablet->set_tablet_schema_into_rowset_meta()) {
RETURN_IF_ERROR(TabletMetaManager::save(this, tablet->tablet_id(),
tablet->schema_hash(), tablet->tablet_meta()));
}
}
auto load_pending_publish_info_func =
[&engine = _engine](int64_t tablet_id, int64_t publish_version, std::string_view info) {
PendingPublishInfoPB pending_publish_info_pb;
bool parsed = pending_publish_info_pb.ParseFromArray(info.data(), info.size());
if (!parsed) {
LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
<< " publish_version: " << publish_version;
}
engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
publish_version,
pending_publish_info_pb.transaction_id(), true);
return true;
};
MonotonicStopWatch pending_publish_timer;
pending_publish_timer.start();
RETURN_IF_ERROR(
TabletMetaManager::traverse_pending_publish(_meta, load_pending_publish_info_func));
pending_publish_timer.stop();
LOG(INFO) << "load pending publish task from meta finished, cost: "
<< pending_publish_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
int64_t rowset_partition_id_eq_0_num = 0;
for (auto rowset_meta : dir_rowset_metas) {
if (rowset_meta->partition_id() == 0) {
++rowset_partition_id_eq_0_num;
}
}
if (rowset_partition_id_eq_0_num > config::ignore_invalid_partition_id_rowset_num) {
throw Exception(Status::FatalError(
"rowset partition id eq 0 is {} bigger than config {}, be exit, plz check be.INFO",
rowset_partition_id_eq_0_num, config::ignore_invalid_partition_id_rowset_num));
}
// traverse rowset
// 1. add committed rowset to txn map
// 2. add visible rowset to tablet
// ignore any errors when load tablet or rowset, because fe will repair them after report
int64_t invalid_rowset_counter = 0;
for (auto&& rowset_meta : dir_rowset_metas) {
TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(rowset_meta->tablet_id());
// tablet maybe dropped, but not drop related rowset meta
if (tablet == nullptr) {
VLOG_NOTICE << "could not find tablet id: " << rowset_meta->tablet_id()
<< ", schema hash: " << rowset_meta->tablet_schema_hash()
<< ", for rowset: " << rowset_meta->rowset_id() << ", skip this rowset";
++invalid_rowset_counter;
continue;
}
if (rowset_meta->partition_id() == 0) {
LOG(WARNING) << "skip tablet_id=" << tablet->tablet_id()
<< " rowset: " << rowset_meta->rowset_id()
<< " txn: " << rowset_meta->txn_id();
continue;
}
RowsetSharedPtr rowset;
Status create_status = tablet->create_rowset(rowset_meta, &rowset);
if (!create_status) {
LOG(WARNING) << "could not create rowset from rowsetmeta: "
<< " rowset_id: " << rowset_meta->rowset_id()
<< " rowset_type: " << rowset_meta->rowset_type()
<< " rowset_state: " << rowset_meta->rowset_state();
continue;
}
if (rowset_meta->rowset_state() == RowsetStatePB::COMMITTED &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb(), false));
}
Status commit_txn_status = _engine.txn_manager()->commit_txn(
_meta, rowset_meta->partition_id(), rowset_meta->txn_id(),
rowset_meta->tablet_id(), rowset_meta->tablet_uid(), rowset_meta->load_id(),
rowset, _engine.pending_local_rowsets().add(rowset_meta->rowset_id()), true);
if (commit_txn_status || commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
LOG(INFO) << "successfully to add committed rowset: " << rowset_meta->rowset_id()
<< " to tablet: " << rowset_meta->tablet_id()
<< " schema hash: " << rowset_meta->tablet_schema_hash()
<< " for txn: " << rowset_meta->txn_id();
} else if (commit_txn_status.is<ErrorCode::INTERNAL_ERROR>()) {
LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
<< " to tablet: " << rowset_meta->tablet_id()
<< " for txn: " << rowset_meta->txn_id()
<< " error: " << commit_txn_status;
return commit_txn_status;
} else {
LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
<< " to tablet: " << rowset_meta->tablet_id()
<< " for txn: " << rowset_meta->txn_id()
<< " error: " << commit_txn_status;
}
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb(), false));
}
Status publish_status = tablet->add_rowset(rowset);
if (!publish_status && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
LOG(WARNING) << "add visible rowset to tablet failed rowset_id:"
<< rowset->rowset_id() << " tablet id: " << rowset_meta->tablet_id()
<< " txn id:" << rowset_meta->txn_id()
<< " start_version: " << rowset_meta->version().first
<< " end_version: " << rowset_meta->version().second;
}
} else {
LOG(WARNING) << "find invalid rowset: " << rowset_meta->rowset_id()
<< " with tablet id: " << rowset_meta->tablet_id()
<< " tablet uid: " << rowset_meta->tablet_uid()
<< " schema hash: " << rowset_meta->tablet_schema_hash()
<< " txn: " << rowset_meta->txn_id()
<< " current valid tablet uid: " << tablet->tablet_uid();
++invalid_rowset_counter;
}
}
int64_t dbm_cnt {0};
int64_t unknown_dbm_cnt {0};
auto load_delete_bitmap_func = [this, &dbm_cnt, &unknown_dbm_cnt](int64_t tablet_id,
int64_t version,
std::string_view val) {
TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
if (!tablet) {
return true;
}
const std::vector<RowsetMetaSharedPtr>& all_rowsets = tablet->tablet_meta()->all_rs_metas();
RowsetIdUnorderedSet rowset_ids;
for (auto& rowset_meta : all_rowsets) {
rowset_ids.insert(rowset_meta->rowset_id());
}
DeleteBitmapPB delete_bitmap_pb;
delete_bitmap_pb.ParseFromArray(val.data(), val.size());
int rst_ids_size = delete_bitmap_pb.rowset_ids_size();
int seg_ids_size = delete_bitmap_pb.segment_ids_size();
int seg_maps_size = delete_bitmap_pb.segment_delete_bitmaps_size();
CHECK(rst_ids_size == seg_ids_size && seg_ids_size == seg_maps_size);
for (size_t i = 0; i < rst_ids_size; ++i) {
RowsetId rst_id;
rst_id.init(delete_bitmap_pb.rowset_ids(i));
// only process the rowset in _rs_metas
if (rowset_ids.find(rst_id) == rowset_ids.end()) {
++unknown_dbm_cnt;
continue;
}
++dbm_cnt;
auto seg_id = delete_bitmap_pb.segment_ids(i);
auto iter = tablet->tablet_meta()->delete_bitmap().delete_bitmap.find(
{rst_id, seg_id, version});
// This version of delete bitmap already exists
if (iter != tablet->tablet_meta()->delete_bitmap().delete_bitmap.end()) {
continue;
}
auto bitmap = delete_bitmap_pb.segment_delete_bitmaps(i).data();
tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id, seg_id, version}] =
roaring::Roaring::read(bitmap);
}
return true;
};
MonotonicStopWatch dbm_timer;
dbm_timer.start();
RETURN_IF_ERROR(TabletMetaManager::traverse_delete_bitmap(_meta, load_delete_bitmap_func));
dbm_timer.stop();
LOG(INFO) << "load delete bitmap from meta finished, cost: "
<< dbm_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
// At startup, we only count these invalid rowset, but do not actually delete it.
// The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
// which is cleaned up uniformly by the background cleanup thread.
LOG(INFO) << "finish to load tablets from " << _path
<< ", total rowset meta: " << dir_rowset_metas.size()
<< ", invalid rowset num: " << invalid_rowset_counter
<< ", visible/stale rowsets' delete bitmap count: " << dbm_cnt
<< ", invalid rowsets' delete bitmap count: " << unknown_dbm_cnt;
return Status::OK();
}