void commit_txn_immediately()

in cloud/src/meta-service/meta_service_txn.cpp [1001:1419]


void commit_txn_immediately(
        const CommitTxnRequest* request, CommitTxnResponse* response,
        std::shared_ptr<TxnKv>& txn_kv, std::shared_ptr<TxnLazyCommitter>& txn_lazy_committer,
        MetaServiceCode& code, std::string& msg, const std::string& instance_id, int64_t db_id,
        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta,
        TxnErrorCode& err) {
    std::stringstream ss;
    int64_t txn_id = request->txn_id();
    do {
        TEST_SYNC_POINT_CALLBACK("commit_txn_immediately:begin", &txn_id);
        int64_t last_pending_txn_id = 0;
        std::unique_ptr<Transaction> txn;
        err = txn_kv->create_txn(&txn);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::CREATE>(err);
            ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        // Get txn info with db_id and txn_id
        std::string info_val; // Will be reused when saving updated txn
        const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
        err = txn->get(info_key, &info_val);
        if (err != TxnErrorCode::TXN_OK) {
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                          : cast_as<ErrCategory::READ>(err);
            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
                ss << "transaction [" << txn_id << "] not found, db_id=" << db_id;
            } else {
                ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
                   << " err=" << err;
            }
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        TxnInfoPB txn_info;
        if (!txn_info.ParseFromString(info_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        // TODO: do more check like txn state, 2PC etc.
        DCHECK(txn_info.txn_id() == txn_id);
        if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
            code = MetaServiceCode::TXN_ALREADY_ABORTED;
            ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
            if (request->has_is_2pc() && request->is_2pc()) {
                code = MetaServiceCode::TXN_ALREADY_VISIBLE;
                ss << "transaction [" << txn_id << "] is already visible, not pre-committed.";
                msg = ss.str();
                LOG(INFO) << msg;
                response->mutable_txn_info()->CopyFrom(txn_info);
                return;
            }
            code = MetaServiceCode::OK;
            ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            LOG(INFO) << msg;
            response->mutable_txn_info()->CopyFrom(txn_info);
            return;
        }

        if (request->has_is_2pc() && request->is_2pc() &&
            txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) {
            code = MetaServiceCode::TXN_INVALID_STATUS;
            ss << "transaction is prepare, not pre-committed: db_id=" << db_id << " txn_id"
               << txn_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString();

        // Prepare rowset meta and new_versions
        // Read tablet indexes in batch.
        std::vector<std::string> tablet_idx_keys;
        for (auto& [_, i] : tmp_rowsets_meta) {
            tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()}));
        }
        std::vector<std::optional<std::string>> tablet_idx_values;
        err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
                             Transaction::BatchGetOptions(false));
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            ss << "failed to get tablet table index ids, err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg << " txn_id=" << txn_id;
            return;
        }

        size_t total_rowsets = tmp_rowsets_meta.size();
        // tablet_id -> {table/index/partition}_id
        std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
        // table_id -> tablets_ids
        std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
        for (size_t i = 0; i < total_rowsets; i++) {
            uint64_t tablet_id = tmp_rowsets_meta[i].second.tablet_id();
            if (!tablet_idx_values[i].has_value()) [[unlikely]] {
                // The value must existed
                code = MetaServiceCode::KV_TXN_GET_ERR;
                ss << "failed to get tablet table index ids, err=not found"
                   << " tablet_id=" << tablet_id << " key=" << hex(tablet_idx_keys[i]);
                msg = ss.str();
                LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
                return;
            }
            if (!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) [[unlikely]] {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "malformed tablet index value tablet_id=" << tablet_id
                   << " txn_id=" << txn_id;
                msg = ss.str();
                LOG(WARNING) << msg;
                return;
            }
            table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
            VLOG_DEBUG << "tablet_id:" << tablet_id
                       << " value:" << tablet_ids[tablet_id].ShortDebugString();
        }

        tablet_idx_keys.clear();
        tablet_idx_values.clear();

        // {table/partition} -> version
        std::unordered_map<std::string, uint64_t> new_versions;
        std::vector<std::string> version_keys;
        for (auto& [_, i] : tmp_rowsets_meta) {
            int64_t tablet_id = i.tablet_id();
            int64_t table_id = tablet_ids[tablet_id].table_id();
            int64_t partition_id = i.partition_id();
            std::string ver_key =
                    partition_version_key({instance_id, db_id, table_id, partition_id});
            if (new_versions.count(ver_key) == 0) {
                new_versions.insert({ver_key, 0});
                version_keys.push_back(std::move(ver_key));
            }
        }
        std::vector<std::optional<std::string>> version_values;
        err = txn->batch_get(&version_values, version_keys);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            ss << "failed to get partition versions, err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg << " txn_id=" << txn_id;
            return;
        }
        size_t total_versions = version_keys.size();
        for (size_t i = 0; i < total_versions; i++) {
            int64_t version;
            if (version_values[i].has_value()) {
                VersionPB version_pb;
                if (!version_pb.ParseFromString(version_values[i].value())) {
                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                    ss << "failed to parse version pb txn_id=" << txn_id
                       << " key=" << hex(version_keys[i]);
                    msg = ss.str();
                    return;
                }
                if (version_pb.pending_txn_ids_size() > 0) {
                    DCHECK(version_pb.pending_txn_ids_size() == 1);
                    last_pending_txn_id = version_pb.pending_txn_ids(0);
                    DCHECK(last_pending_txn_id > 0);
                    break;
                }
                version = version_pb.version();
            } else {
                version = 1;
            }
            new_versions[version_keys[i]] = version + 1;
            last_pending_txn_id = 0;
        }
        version_keys.clear();
        version_values.clear();

        if (last_pending_txn_id > 0) {
            txn.reset();
            TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::advance_last_pending_txn_id",
                                     &last_pending_txn_id);
            std::shared_ptr<TxnLazyCommitTask> task =
                    txn_lazy_committer->submit(instance_id, last_pending_txn_id);

            std::tie(code, msg) = task->wait();
            if (code != MetaServiceCode::OK) {
                LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
                             << " code=" << code << " msg=" << msg;
                return;
            }
            last_pending_txn_id = 0;
            continue;
        }

        std::vector<std::pair<std::string, std::string>> rowsets;
        std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
        rowsets.reserve(tmp_rowsets_meta.size());
        for (auto& [_, i] : tmp_rowsets_meta) {
            int64_t tablet_id = i.tablet_id();
            int64_t table_id = tablet_ids[tablet_id].table_id();
            int64_t partition_id = i.partition_id();
            std::string ver_key =
                    partition_version_key({instance_id, db_id, table_id, partition_id});
            if (new_versions[ver_key] == 0) [[unlikely]] {
                // it is impossible.
                code = MetaServiceCode::UNDEFINED_ERR;
                ss << "failed to get partition version key, the target version not exists in "
                      "new_versions."
                   << " txn_id=" << txn_id;
                msg = ss.str();
                LOG(ERROR) << msg;
                return;
            }

            // Update rowset version
            int64_t new_version = new_versions[ver_key];
            i.set_start_version(new_version);
            i.set_end_version(new_version);

            std::string key = meta_rowset_key({instance_id, tablet_id, i.end_version()});
            std::string val;
            if (!i.SerializeToString(&val)) {
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
                msg = ss.str();
                return;
            }
            rowsets.emplace_back(std::move(key), std::move(val));

            // Accumulate affected rows
            auto& stats = tablet_stats[tablet_id];
            stats.data_size += i.total_disk_size();
            stats.num_rows += i.num_rows();
            ++stats.num_rowsets;
            stats.num_segs += i.num_segments();
            stats.index_size += i.index_disk_size();
            stats.segment_size += i.data_disk_size();
        } // for tmp_rowsets_meta

        process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
        if (code != MetaServiceCode::OK) {
            LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
            return;
        }

        // Save rowset meta
        for (auto& i : rowsets) {
            size_t rowset_size = i.first.size() + i.second.size();
            txn->put(i.first, i.second);
            LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << txn_id
                      << " rowset_size=" << rowset_size;
        }

        // Save versions
        int64_t version_update_time_ms =
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
        response->set_version_update_time_ms(version_update_time_ms);
        for (auto& i : new_versions) {
            std::string ver_val;
            VersionPB version_pb;
            version_pb.set_version(i.second);
            version_pb.set_update_time_ms(version_update_time_ms);
            if (!version_pb.SerializeToString(&ver_val)) {
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                ss << "failed to serialize version_pb when saving, txn_id=" << txn_id;
                msg = ss.str();
                return;
            }

            txn->put(i.first, ver_val);
            LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " version:" << i.second
                      << " txn_id=" << txn_id << " update_time=" << version_update_time_ms;

            std::string_view ver_key = i.first;
            ver_key.remove_prefix(1); // Remove key space
            // PartitionVersionKeyInfo  {instance_id, db_id, table_id, partition_id}
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
            int ret = decode_key(&ver_key, &out);
            if (ret != 0) [[unlikely]] {
                // decode version key error means this is something wrong,
                // we can not continue this txn
                LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(ver_key);
                code = MetaServiceCode::UNDEFINED_ERR;
                msg = "decode version key error";
                return;
            }

            int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
            int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
            VLOG_DEBUG << " table_id=" << table_id << " partition_id=" << partition_id;

            response->add_table_ids(table_id);
            response->add_partition_ids(partition_id);
            response->add_versions(i.second);
        }

        // Save table versions
        for (auto& i : table_id_tablet_ids) {
            std::string ver_key = table_version_key({instance_id, db_id, i.first});
            txn->atomic_add(ver_key, 1);
            LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key)
                      << " txn_id=" << txn_id;
        }

        LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();

        // Update txn_info
        txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);

        auto now_time = system_clock::now();
        uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
        if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id);
            LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
                      << " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time;
            return;
        }
        txn_info.set_commit_time(commit_time);
        txn_info.set_finish_time(commit_time);
        if (request->has_commit_attachment()) {
            txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
        }
        LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
        info_val.clear();
        if (!txn_info.SerializeToString(&info_val)) {
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
            msg = ss.str();
            return;
        }
        txn->put(info_key, info_val);
        LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;

        // Update stats of affected tablet
        for (auto& [tablet_id, stats] : tablet_stats) {
            DCHECK(tablet_ids.count(tablet_id));
            auto& tablet_idx = tablet_ids[tablet_id];
            StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
                                     tablet_idx.partition_id(), tablet_id};
            update_tablet_stats(info, stats, txn, code, msg);
            if (code != MetaServiceCode::OK) return;
        }
        // Remove tmp rowset meta
        for (auto& [k, _] : tmp_rowsets_meta) {
            txn->remove(k);
            LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id;
        }

        const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
        LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" << txn_id;
        txn->remove(running_key);

        std::string recycle_val;
        std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
        RecycleTxnPB recycle_pb;
        recycle_pb.set_creation_time(commit_time);
        recycle_pb.set_label(txn_info.label());

        if (!recycle_pb.SerializeToString(&recycle_val)) {
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
            msg = ss.str();
            return;
        }
        txn->put(recycle_key, recycle_val);

        if (txn_info.load_job_source_type() ==
            LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
            put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
        }

        LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key)
                  << " txn_id=" << txn_id;
        LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
                  << " del_size=" << txn->delete_bytes() << " num_put_keys=" << txn->num_put_keys()
                  << " num_del_keys=" << txn->num_del_keys()
                  << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;

        TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_immediately::before_commit", &err, &code);
        // Finally we are done...
        err = txn->commit();
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::COMMIT>(err);
            ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            return;
        }

        // calculate table stats from tablets stats
        std::map<int64_t /*table_id*/, TableStats> table_stats;
        std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
                                             request->base_tablet_ids().end());
        calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
        for (const auto& pair : table_stats) {
            TableStatsPB* stats_pb = response->add_table_stats();
            auto table_id = pair.first;
            auto stats = pair.second;
            get_pb_from_tablestats(stats, stats_pb);
            stats_pb->set_table_id(table_id);
            VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
                       << " table_id=" << table_id
                       << " updated_row_count=" << stats_pb->updated_row_count();
        }
        response->mutable_txn_info()->CopyFrom(txn_info);
        TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::finish", &code);
        break;
    } while (true);
} // end commit_txn_immediately