in cloud/src/meta-service/meta_service_txn.cpp [1938:2515]
void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* response,
std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code,
std::string& msg, const std::string& instance_id) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
auto sub_txn_infos = request->sub_txn_infos();
// Create a readonly txn for scan tmp rowset
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
// Get db id with txn id
std::string index_val;
const std::string index_key = txn_index_key({instance_id, txn_id});
err = txn->get(index_key, &index_val);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnIndexPB index_pb;
if (!index_pb.ParseFromString(index_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DCHECK(index_pb.has_tablet_index() == true);
DCHECK(index_pb.tablet_index().has_db_id() == true);
int64_t db_id = index_pb.tablet_index().db_id();
// Get temporary rowsets involved in the txn
std::map<int64_t, std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>>
sub_txn_to_tmp_rowsets_meta;
for (const auto& sub_txn_info : sub_txn_infos) {
auto sub_txn_id = sub_txn_info.sub_txn_id();
// This is a range scan
MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, sub_txn_id, 0};
MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, sub_txn_id + 1, 0};
std::string rs_tmp_key0;
std::string rs_tmp_key1;
meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
// Get rowset meta that should be commited
// tmp_rowset_key -> rowset_meta
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta;
int num_rowsets = 0;
std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
(int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id, &sub_txn_id](int*) {
LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id
<< ", sub_txn_id=" << sub_txn_id << " num_rowsets=" << num_rowsets
<< " range=[" << hex(rs_tmp_key0) << "," << hex(rs_tmp_key1) << ")";
});
std::unique_ptr<RangeGetIterator> it;
do {
err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
if (err == TxnErrorCode::TXN_TOO_OLD) {
err = txn_kv->create_txn(&txn);
if (err == TxnErrorCode::TXN_OK) {
err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
}
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "internal error, failed to get tmp rowset while committing, txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
while (it->has_next()) {
auto [k, v] = it->next();
LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " txn_id=" << txn_id;
tmp_rowsets_meta.emplace_back();
if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(), v.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "malformed rowset meta, unable to initialize, txn_id=" << txn_id
<< " key=" << hex(k);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
// Save keys that will be removed later
tmp_rowsets_meta.back().first = std::string(k.data(), k.size());
++num_rowsets;
if (!it->has_next()) rs_tmp_key0 = k;
}
rs_tmp_key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
VLOG_DEBUG << "txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id
<< " tmp_rowsets_meta.size()=" << tmp_rowsets_meta.size();
sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, std::move(tmp_rowsets_meta));
}
// Create a read/write txn for guarantee consistency
txn.reset();
err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
// Get txn info with db_id and txn_id
std::string info_val; // Will be reused when saving updated txn
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
// TODO: do more check like txn state
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
code = MetaServiceCode::OK;
ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(INFO) << msg;
response->mutable_txn_info()->CopyFrom(txn_info);
return;
}
LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString();
// Prepare rowset meta and new_versions
// Read tablet indexes in batch.
std::map<int64_t, int64_t> tablet_id_to_idx;
std::vector<std::string> tablet_idx_keys;
std::vector<int64_t> partition_ids;
auto idx = 0;
for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
for (auto& [_, i] : tmp_rowsets_meta) {
auto tablet_id = i.tablet_id();
if (tablet_id_to_idx.count(tablet_id) == 0) {
tablet_id_to_idx.emplace(tablet_id, idx);
tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()}));
partition_ids.push_back(i.partition_id());
idx++;
}
}
}
std::vector<std::optional<std::string>> tablet_idx_values;
err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, Transaction::BatchGetOptions(false));
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get tablet table index ids, err=" << err;
msg = ss.str();
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}
// tablet_id -> {table/index/partition}_id
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
// table_id -> tablets_ids
std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
for (auto [tablet_id, i] : tablet_id_to_idx) {
if (!tablet_idx_values[i].has_value()) [[unlikely]] {
// The value must existed
code = MetaServiceCode::KV_TXN_GET_ERR;
ss << "failed to get tablet table index ids, err=not found"
<< " tablet_id=" << tablet_id << " key=" << hex(tablet_idx_keys[i]);
msg = ss.str();
LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
return;
}
if (!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) [[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "malformed tablet index value tablet_id=" << tablet_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
VLOG_DEBUG << "tablet_id:" << tablet_id
<< " value:" << tablet_ids[tablet_id].ShortDebugString();
}
tablet_idx_keys.clear();
tablet_idx_values.clear();
// {table/partition} -> version
std::unordered_map<std::string, uint64_t> new_versions;
std::vector<std::string> version_keys;
for (auto& [tablet_id, i] : tablet_id_to_idx) {
int64_t table_id = tablet_ids[tablet_id].table_id();
int64_t partition_id = partition_ids[i];
std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id});
if (new_versions.count(ver_key) == 0) {
new_versions.insert({ver_key, 0});
LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key) << " txn_id=" << txn_id
<< ", db_id=" << db_id << ", table_id=" << table_id
<< ", partition_id=" << partition_id;
version_keys.push_back(std::move(ver_key));
}
}
std::vector<std::optional<std::string>> version_values;
err = txn->batch_get(&version_values, version_keys);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get partition versions, err=" << err;
msg = ss.str();
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}
size_t total_versions = version_keys.size();
for (size_t i = 0; i < total_versions; i++) {
int64_t version;
if (version_values[i].has_value()) {
VersionPB version_pb;
if (!version_pb.ParseFromString(version_values[i].value())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse version pb txn_id=" << txn_id
<< " key=" << hex(version_keys[i]);
msg = ss.str();
return;
}
version = version_pb.version();
} else {
version = 1;
}
new_versions[version_keys[i]] = version;
LOG(INFO) << "xxx get partition_version_key=" << hex(version_keys[i])
<< " version:" << version << " txn_id=" << txn_id;
}
version_keys.clear();
version_values.clear();
std::vector<std::pair<std::string, std::string>> rowsets;
std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
for (const auto& sub_txn_info : sub_txn_infos) {
auto sub_txn_id = sub_txn_info.sub_txn_id();
auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
std::unordered_map<int64_t, int64_t> partition_id_to_version;
for (auto& [_, i] : tmp_rowsets_meta) {
int64_t tablet_id = i.tablet_id();
int64_t table_id = tablet_ids[tablet_id].table_id();
int64_t partition_id = i.partition_id();
std::string ver_key =
partition_version_key({instance_id, db_id, table_id, partition_id});
if (new_versions.count(ver_key) == 0) [[unlikely]] {
// it is impossible.
code = MetaServiceCode::UNDEFINED_ERR;
ss << "failed to get partition version key, the target version not exists in "
"new_versions."
<< " txn_id=" << txn_id << ", db_id=" << db_id << ", table_id=" << table_id
<< ", partition_id=" << partition_id;
msg = ss.str();
LOG(ERROR) << msg;
return;
}
// Update rowset version
int64_t new_version = new_versions[ver_key];
if (partition_id_to_version.count(partition_id) == 0) {
new_versions[ver_key] = new_version + 1;
new_version = new_versions[ver_key];
partition_id_to_version[partition_id] = new_version;
}
i.set_start_version(new_version);
i.set_end_version(new_version);
LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
<< ", sub_txn_id=" << sub_txn_id << ", table_id=" << table_id
<< ", partition_id=" << partition_id << ", tablet_id=" << tablet_id
<< ", new_version=" << new_version;
std::string key = meta_rowset_key({instance_id, tablet_id, i.end_version()});
std::string val;
if (!i.SerializeToString(&val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
msg = ss.str();
return;
}
rowsets.emplace_back(std::move(key), std::move(val));
// Accumulate affected rows
auto& stats = tablet_stats[tablet_id];
stats.data_size += i.total_disk_size();
stats.num_rows += i.num_rows();
++stats.num_rowsets;
stats.num_segs += i.num_segments();
stats.index_size += i.index_disk_size();
stats.segment_size += i.data_disk_size();
} // for tmp_rowsets_meta
}
process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
return;
}
// Save rowset meta
for (auto& i : rowsets) {
size_t rowset_size = i.first.size() + i.second.size();
txn->put(i.first, i.second);
LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << txn_id
<< " rowset_size=" << rowset_size;
}
// Save versions
for (auto& i : new_versions) {
std::string ver_val;
VersionPB version_pb;
version_pb.set_version(i.second);
if (!version_pb.SerializeToString(&ver_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize version_pb when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(i.first, ver_val);
LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " version:" << i.second
<< " txn_id=" << txn_id;
std::string_view ver_key = i.first;
ver_key.remove_prefix(1); // Remove key space
// PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id}
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
int ret = decode_key(&ver_key, &out);
if (ret != 0) [[unlikely]] {
// decode version key error means this is something wrong,
// we can not continue this txn
LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(ver_key);
code = MetaServiceCode::UNDEFINED_ERR;
msg = "decode version key error";
return;
}
int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
<< " partition_id=" << partition_id << " version=" << i.second;
response->add_table_ids(table_id);
response->add_partition_ids(partition_id);
response->add_versions(i.second);
}
// Save table versions
for (auto& i : table_id_tablet_ids) {
std::string ver_key = table_version_key({instance_id, db_id, i.first});
txn->atomic_add(ver_key, 1);
LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << " txn_id=" << txn_id;
}
LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
// Update txn_info
txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
auto now_time = system_clock::now();
uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
code = MetaServiceCode::UNDEFINED_ERR;
msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id);
LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
<< " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time;
return;
}
txn_info.set_commit_time(commit_time);
txn_info.set_finish_time(commit_time);
if (request->has_commit_attachment()) {
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
}
LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
info_val.clear();
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(info_key, info_val);
LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
// Update stats of affected tablet
std::deque<std::string> kv_pool;
std::function<void(const StatsTabletKeyInfo&, const TabletStats&)> update_tablet_stats;
if (config::split_tablet_stats) {
update_tablet_stats = [&](const StatsTabletKeyInfo& info, const TabletStats& stats) {
if (stats.num_segs > 0) {
auto& data_size_key = kv_pool.emplace_back();
stats_tablet_data_size_key(info, &data_size_key);
txn->atomic_add(data_size_key, stats.data_size);
auto& num_rows_key = kv_pool.emplace_back();
stats_tablet_num_rows_key(info, &num_rows_key);
txn->atomic_add(num_rows_key, stats.num_rows);
auto& num_segs_key = kv_pool.emplace_back();
stats_tablet_num_segs_key(info, &num_segs_key);
txn->atomic_add(num_segs_key, stats.num_segs);
auto& index_size_key = kv_pool.emplace_back();
stats_tablet_index_size_key(info, &index_size_key);
txn->atomic_add(index_size_key, stats.index_size);
auto& segment_size_key = kv_pool.emplace_back();
stats_tablet_segment_size_key(info, &segment_size_key);
txn->atomic_add(segment_size_key, stats.segment_size);
}
auto& num_rowsets_key = kv_pool.emplace_back();
stats_tablet_num_rowsets_key(info, &num_rowsets_key);
txn->atomic_add(num_rowsets_key, stats.num_rowsets);
};
} else {
update_tablet_stats = [&](const StatsTabletKeyInfo& info, const TabletStats& stats) {
auto& key = kv_pool.emplace_back();
stats_tablet_key(info, &key);
auto& val = kv_pool.emplace_back();
TxnErrorCode err = txn->get(key, &val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
std::get<4>(info));
return;
}
TabletStatsPB stats_pb;
if (!stats_pb.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("malformed tablet stats value, key={}", hex(key));
return;
}
stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
stats_pb.set_num_rowsets(stats_pb.num_rowsets() + stats.num_rowsets);
stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs);
stats_pb.set_index_size(stats_pb.index_size() + stats.index_size);
stats_pb.set_segment_size(stats_pb.segment_size() + stats.segment_size);
stats_pb.SerializeToString(&val);
txn->put(key, val);
LOG(INFO) << "put stats_tablet_key, key=" << hex(key);
};
}
for (auto& [tablet_id, stats] : tablet_stats) {
DCHECK(tablet_ids.count(tablet_id));
auto& tablet_idx = tablet_ids[tablet_id];
StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id};
update_tablet_stats(info, stats);
if (code != MetaServiceCode::OK) return;
}
// Remove tmp rowset meta
for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
for (auto& [k, _] : tmp_rowsets_meta) {
txn->remove(k);
LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id;
}
}
const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" << txn_id;
txn->remove(running_key);
std::string recycle_val;
std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
RecycleTxnPB recycle_pb;
recycle_pb.set_creation_time(commit_time);
recycle_pb.set_label(txn_info.label());
if (!recycle_pb.SerializeToString(&recycle_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(recycle_key, recycle_val);
LOG(INFO) << "xxx commit_txn put recycle_txn_key key=" << hex(recycle_key)
<< " txn_id=" << txn_id;
LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes()
<< " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" << txn->num_del_keys()
<< " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;
// Finally we are done...
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_VALUE_TOO_LARGE || err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
size_t max_size = 0, max_num_segments = 0,
min_num_segments = std::numeric_limits<size_t>::max(), avg_num_segments = 0;
std::pair<std::string, RowsetMetaCloudPB>* max_rowset_meta = nullptr;
for (auto& sub_txn : sub_txn_infos) {
auto it = sub_txn_to_tmp_rowsets_meta.find(sub_txn.sub_txn_id());
if (it == sub_txn_to_tmp_rowsets_meta.end()) {
continue;
}
for (auto& rowset_meta : it->second) {
if (rowset_meta.second.ByteSizeLong() > max_size) {
max_size = rowset_meta.second.ByteSizeLong();
max_rowset_meta = &rowset_meta;
}
if (rowset_meta.second.num_segments() > max_num_segments) {
max_num_segments = rowset_meta.second.num_segments();
}
if (rowset_meta.second.num_segments() < min_num_segments) {
min_num_segments = rowset_meta.second.num_segments();
}
avg_num_segments += rowset_meta.second.num_segments();
}
if (!it->second.empty()) {
avg_num_segments /= it->second.size();
}
}
if (max_rowset_meta) {
LOG(WARNING) << "failed to commit kv txn with sub txn"
<< ", err=" << err << ", txn_id=" << txn_id
<< ", total_rowsets=" << rowsets.size()
<< ", avg_num_segments=" << avg_num_segments
<< ", min_num_segments=" << min_num_segments
<< ", max_num_segments=" << max_num_segments
<< ", largest_rowset_size=" << max_size
<< ", largest_rowset_key=" << hex(max_rowset_meta->first)
<< ", largest_rowset_value="
<< max_rowset_meta->second.ShortDebugString();
}
}
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
// calculate table stats from tablets stats
std::map<int64_t /*table_id*/, TableStats> table_stats;
std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
request->base_tablet_ids().end());
calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
for (const auto& pair : table_stats) {
TableStatsPB* stats_pb = response->add_table_stats();
auto table_id = pair.first;
auto stats = pair.second;
get_pb_from_tablestats(stats, stats_pb);
stats_pb->set_table_id(table_id);
VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
<< " table_id=" << table_id
<< " updated_row_count=" << stats_pb->updated_row_count();
}
response->mutable_txn_info()->CopyFrom(txn_info);
} // end commit_txn_with_sub_txn