in cloud/src/meta-service/meta_service_txn.cpp [1567:1908]
void commit_txn_eventually(
const CommitTxnRequest* request, CommitTxnResponse* response,
std::shared_ptr<TxnKv>& txn_kv, std::shared_ptr<TxnLazyCommitter>& txn_lazy_committer,
MetaServiceCode& code, std::string& msg, const std::string& instance_id, int64_t db_id,
const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta) {
StopWatch sw;
std::unique_ptr<int, std::function<void(int*)>> defer_status((int*)0x01, [&](int*) {
if (config::use_detailed_metrics && !instance_id.empty()) {
g_bvar_ms_commit_txn_eventually.put(instance_id, sw.elapsed_us());
}
});
std::stringstream ss;
TxnErrorCode err = TxnErrorCode::TXN_OK;
int64_t txn_id = request->txn_id();
do {
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually:begin", &txn_id);
int64_t last_pending_txn_id = 0;
std::unique_ptr<Transaction> txn;
err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
// tablet_id -> {table/index/partition}_id
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
// table_id -> tablets_ids
std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
bool need_repair_tablet_idx = false;
get_tablet_indexes(instance_id, txn_id, tmp_rowsets_meta, txn, code, msg, &tablet_ids,
&table_id_tablet_ids, &need_repair_tablet_idx);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "get_tablet_indexes failed, txn_id=" << txn_id << " code=" << code;
return;
}
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::need_repair_tablet_idx",
&need_repair_tablet_idx);
if (need_repair_tablet_idx) {
txn.reset();
repair_tablet_index(txn_kv, code, msg, instance_id, db_id, txn_id, tmp_rowsets_meta);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "repair_tablet_index failed, txn_id=" << txn_id << " code=" << code;
return;
}
continue;
}
// <partition_version_key, version>
std::unordered_map<std::string, uint64_t> new_versions;
std::vector<std::string> version_keys;
for (auto& [_, i] : tmp_rowsets_meta) {
int64_t tablet_id = i.tablet_id();
int64_t table_id = tablet_ids[tablet_id].table_id();
int64_t partition_id = i.partition_id();
std::string ver_key =
partition_version_key({instance_id, db_id, table_id, partition_id});
if (new_versions.count(ver_key) == 0) {
new_versions.insert({ver_key, 0});
version_keys.push_back(std::move(ver_key));
}
}
std::vector<std::optional<std::string>> version_values;
err = txn->batch_get(&version_values, version_keys);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get partition versions, err=" << err;
msg = ss.str();
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}
for (size_t i = 0; i < version_keys.size(); i++) {
int64_t version;
if (version_values[i].has_value()) {
VersionPB version_pb;
if (!version_pb.ParseFromString(version_values[i].value())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse version pb txn_id=" << txn_id
<< " key=" << hex(version_keys[i]);
msg = ss.str();
return;
}
if (version_pb.pending_txn_ids_size() > 0) {
DCHECK(version_pb.pending_txn_ids_size() == 1);
last_pending_txn_id = version_pb.pending_txn_ids(0);
DCHECK(last_pending_txn_id > 0);
break;
}
version = version_pb.version();
} else {
version = 1;
}
new_versions[version_keys[i]] = version;
last_pending_txn_id = 0;
}
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::last_pending_txn_id",
&last_pending_txn_id);
if (last_pending_txn_id > 0) {
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::advance_last_pending_txn_id",
&last_pending_txn_id);
txn.reset();
std::shared_ptr<TxnLazyCommitTask> task =
txn_lazy_committer->submit(instance_id, last_pending_txn_id);
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
<< " code=" << code << " msg=" << msg;
return;
}
last_pending_txn_id = 0;
// there maybe concurrent commit_txn_eventually, so we need continue to make sure
// partition versionPB has no txn_id
continue;
}
std::string info_val;
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString();
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
if (request->has_is_2pc() && request->is_2pc()) {
code = MetaServiceCode::TXN_ALREADY_VISIBLE;
ss << "transaction [" << txn_id << "] is already visible, not pre-committed.";
msg = ss.str();
LOG(INFO) << msg;
response->mutable_txn_info()->CopyFrom(txn_info);
return;
}
code = MetaServiceCode::OK;
ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(INFO) << msg;
response->mutable_txn_info()->CopyFrom(txn_info);
return;
}
if (request->has_is_2pc() && request->is_2pc() &&
txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) {
code = MetaServiceCode::TXN_INVALID_STATUS;
ss << "transaction is prepare, not pre-committed: db_id=" << db_id << " txn_id"
<< txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
auto now_time = system_clock::now();
uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
code = MetaServiceCode::UNDEFINED_ERR;
msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id);
LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
<< " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time;
return;
}
txn_info.set_commit_time(commit_time);
txn_info.set_finish_time(commit_time);
if (request->has_commit_attachment()) {
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
}
DCHECK(txn_info.status() != TxnStatusPB::TXN_STATUS_COMMITTED);
// set status TXN_STATUS_COMMITTED not TXN_STATUS_VISIBLE !!!
// lazy commit task will advance txn to make txn visible
txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED);
LOG(INFO) << "after update txn_id= " << txn_id
<< " txn_info=" << txn_info.ShortDebugString();
info_val.clear();
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(info_key, info_val);
LOG(INFO) << "put info_key=" << hex(info_key) << " txn_id=" << txn_id;
if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
}
// save versions for partition
int64_t version_update_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
response->set_version_update_time_ms(version_update_time_ms);
for (auto& i : new_versions) {
std::string ver_val;
VersionPB version_pb;
version_pb.add_pending_txn_ids(txn_id);
version_pb.set_update_time_ms(version_update_time_ms);
if (i.second > 1) {
version_pb.set_version(i.second);
}
if (!version_pb.SerializeToString(&ver_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize version_pb when saving, txn_id=" << txn_id
<< " partiton_key=" << hex(i.first);
msg = ss.str();
return;
}
txn->put(i.first, ver_val);
LOG(INFO) << "put partition_version_key=" << hex(i.first) << " version:" << i.second
<< " txn_id=" << txn_id << " update_time=" << version_update_time_ms;
std::string_view ver_key = i.first;
ver_key.remove_prefix(1); // Remove key space
// PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id}
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
int ret = decode_key(&ver_key, &out);
if (ret != 0) [[unlikely]] {
// decode version key error means this is something wrong,
// we can not continue this txn
LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(ver_key);
code = MetaServiceCode::UNDEFINED_ERR;
msg = "decode version key error";
return;
}
int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
<< " partition_id=" << partition_id << " version=" << i.second;
response->add_table_ids(table_id);
response->add_partition_ids(partition_id);
response->add_versions(i.second + 1);
}
process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
return;
}
// Save table versions
for (auto& i : table_id_tablet_ids) {
std::string ver_key = table_version_key({instance_id, db_id, i.first});
txn->atomic_add(ver_key, 1);
LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key)
<< " txn_id=" << txn_id;
}
VLOG_DEBUG << "put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes()
<< " num_put_keys=" << txn->num_put_keys()
<< " num_del_keys=" << txn->num_del_keys()
<< " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_eventually::txn_lazy_committer_submit",
&txn_id);
std::shared_ptr<TxnLazyCommitTask> task = txn_lazy_committer->submit(instance_id, txn_id);
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::txn_lazy_committer_wait", &txn_id);
std::pair<MetaServiceCode, std::string> ret = task->wait();
if (ret.first != MetaServiceCode::OK) {
LOG(WARNING) << "txn lazy commit failed txn_id=" << txn_id << " code=" << ret.first
<< " msg=" << ret.second;
}
std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
for (auto& [_, i] : tmp_rowsets_meta) {
// Accumulate affected rows
auto& stats = tablet_stats[i.tablet_id()];
stats.data_size += i.total_disk_size();
stats.num_rows += i.num_rows();
++stats.num_rowsets;
stats.num_segs += i.num_segments();
stats.index_size += i.index_disk_size();
stats.segment_size += i.data_disk_size();
}
// calculate table stats from tablets stats
std::map<int64_t /*table_id*/, TableStats> table_stats;
std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
request->base_tablet_ids().end());
calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
for (const auto& pair : table_stats) {
TableStatsPB* stats_pb = response->add_table_stats();
auto table_id = pair.first;
auto stats = pair.second;
get_pb_from_tablestats(stats, stats_pb);
stats_pb->set_table_id(table_id);
VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
<< " table_id=" << table_id
<< " updated_row_count=" << stats_pb->updated_row_count();
}
// txn set visible for fe callback
txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
response->mutable_txn_info()->CopyFrom(txn_info);
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::finish", &code, &txn_id);
break;
} while (true);
}