void _ingest_binlog()

in be/src/service/backend_service.cpp [115:663]


void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
    std::optional<HttpClient> client;
    if (config::enable_ingest_binlog_with_persistent_connection) {
        // Save the http client instance for persistent connection
        client = std::make_optional<HttpClient>();
    }

    auto txn_id = arg->txn_id;
    auto partition_id = arg->partition_id;
    auto local_tablet_id = arg->local_tablet_id;
    const auto& local_tablet = arg->local_tablet;
    const auto& local_tablet_uid = local_tablet->tablet_uid();

    std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
            MemTrackerLimiter::Type::OTHER, fmt::format("IngestBinlog#TxnId={}", txn_id));
    SCOPED_ATTACH_TASK(mem_tracker);

    auto& request = arg->request;

    MonotonicStopWatch watch(true);
    int64_t total_download_bytes = 0;
    int64_t total_download_files = 0;
    TStatus tstatus;
    std::vector<std::string> download_success_files;
    std::unordered_map<std::string_view, uint64_t> elapsed_time_map;
    Defer defer {[=, &engine, &tstatus, ingest_binlog_tstatus = arg->tstatus, &watch,
                  &total_download_bytes, &total_download_files, &elapsed_time_map]() {
        g_ingest_binlog_latency << watch.elapsed_time_microseconds();
        auto elapsed_time_ms = static_cast<int64_t>(watch.elapsed_time_milliseconds());
        double copy_rate = 0.0;
        if (elapsed_time_ms > 0) {
            copy_rate = total_download_bytes / ((double)elapsed_time_ms) / 1000;
        }
        LOG(INFO) << "ingest binlog elapsed " << elapsed_time_ms << " ms, download "
                  << total_download_files << " files, total " << total_download_bytes
                  << " bytes, avg rate " << copy_rate
                  << " MB/s. result: " << apache::thrift::ThriftDebugString(tstatus);
        if (config::ingest_binlog_elapsed_threshold_ms >= 0 &&
            elapsed_time_ms > config::ingest_binlog_elapsed_threshold_ms) {
            auto elapsed_details_view =
                    elapsed_time_map | std::views::transform([](const auto& pair) {
                        return fmt::format("{}:{}", pair.first, pair.second);
                    });
            std::string elapsed_details = fmt::format("{}", fmt::join(elapsed_details_view, ", "));
            LOG(WARNING) << "ingest binlog elapsed " << elapsed_time_ms << " ms, "
                         << elapsed_details;
        }
        if (tstatus.status_code != TStatusCode::OK) {
            // abort txn
            engine.txn_manager()->abort_txn(partition_id, txn_id, local_tablet_id,
                                            local_tablet_uid);
            // delete all successfully downloaded files
            LOG(WARNING) << "will delete downloaded success files due to error " << tstatus;
            std::vector<io::Path> paths;
            for (const auto& file : download_success_files) {
                paths.emplace_back(file);
                LOG(WARNING) << "will delete downloaded success file " << file << " due to error";
            }
            static_cast<void>(io::global_local_filesystem()->batch_delete(paths));
            LOG(WARNING) << "done delete downloaded success files due to error " << tstatus;
        }

        if (ingest_binlog_tstatus) {
            *ingest_binlog_tstatus = std::move(tstatus);
        }
    }};

    auto set_tstatus = [&tstatus](TStatusCode::type code, std::string error_msg) {
        tstatus.__set_status_code(code);
        tstatus.__isset.error_msgs = true;
        tstatus.error_msgs.push_back(std::move(error_msg));
    };

    auto estimate_download_timeout = [](int64_t file_size) {
        uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
        if (estimate_timeout < config::download_low_speed_time) {
            estimate_timeout = config::download_low_speed_time;
        }
        return estimate_timeout;
    };

    // Step 3: get binlog info
    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host,
                                      request.remote_port);
    constexpr int max_retry = 3;

    auto get_binlog_info_url =
            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", binlog_api_url,
                        "get_binlog_info", request.remote_tablet_id, request.binlog_version);
    std::string binlog_info;
    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) {
        RETURN_IF_ERROR(client->init(get_binlog_info_url));
        client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
        return client->execute(&binlog_info);
    };
    auto status = _exec_http_req(client, max_retry, 1, get_binlog_info_cb);
    if (!status.ok()) {
        LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url
                     << ", status=" << status.to_string();
        status.to_thrift(&tstatus);
        return;
    }
    elapsed_time_map.emplace("get_binlog_info", watch.elapsed_time_microseconds());

    std::vector<std::string> binlog_info_parts = absl::StrSplit(binlog_info, ":");
    if (binlog_info_parts.size() != 2) {
        status = Status::RuntimeError("failed to parse binlog info into 2 parts: {}", binlog_info);
        LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url
                     << ", status=" << status.to_string();
        status.to_thrift(&tstatus);
        return;
    }
    std::string remote_rowset_id = std::move(binlog_info_parts[0]);
    int64_t num_segments = -1;
    try {
        num_segments = std::stoll(binlog_info_parts[1]);
    } catch (std::exception& e) {
        status = Status::RuntimeError("failed to parse num segments from binlog info {}: {}",
                                      binlog_info, e.what());
        LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url
                     << ", status=" << status;
        status.to_thrift(&tstatus);
        return;
    }

    // Step 4: get rowset meta
    auto get_rowset_meta_url = fmt::format(
            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", binlog_api_url,
            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, request.binlog_version);
    std::string rowset_meta_str;
    auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) {
        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
        client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
        return client->execute(&rowset_meta_str);
    };
    status = _exec_http_req(client, max_retry, 1, get_rowset_meta_cb);
    if (!status.ok()) {
        LOG(WARNING) << "failed to get rowset meta from " << get_rowset_meta_url
                     << ", status=" << status.to_string();
        status.to_thrift(&tstatus);
        return;
    }
    elapsed_time_map.emplace("get_rowset_meta", watch.elapsed_time_microseconds());

    RowsetMetaPB rowset_meta_pb;
    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
        LOG(WARNING) << "failed to parse rowset meta from " << get_rowset_meta_url;
        status = Status::InternalError("failed to parse rowset meta");
        status.to_thrift(&tstatus);
        return;
    }
    // save source rowset id and tablet id
    rowset_meta_pb.set_source_rowset_id(remote_rowset_id);
    rowset_meta_pb.set_source_tablet_id(request.remote_tablet_id);
    // rewrite rowset meta
    rowset_meta_pb.set_tablet_id(local_tablet_id);
    rowset_meta_pb.set_partition_id(partition_id);
    rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
    rowset_meta_pb.set_txn_id(txn_id);
    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
    auto rowset_meta = std::make_shared<RowsetMeta>();
    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
        LOG(WARNING) << "failed to init rowset meta from " << get_rowset_meta_url;
        status = Status::InternalError("failed to init rowset meta");
        status.to_thrift(&tstatus);
        return;
    }
    RowsetId new_rowset_id = engine.next_rowset_id();
    auto pending_rs_guard = engine.pending_local_rowsets().add(new_rowset_id);
    rowset_meta->set_rowset_id(new_rowset_id);
    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());

    // Step 5: get all segment files
    // Step 5.1: get all segment files size
    std::vector<std::string> segment_file_urls;
    segment_file_urls.reserve(num_segments);
    std::vector<uint64_t> segment_file_sizes;
    segment_file_sizes.reserve(num_segments);
    for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
        auto get_segment_file_size_url = fmt::format(
                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", binlog_api_url,
                "get_segment_file", request.remote_tablet_id, remote_rowset_id, segment_index);
        uint64_t segment_file_size;
        auto get_segment_file_size_cb = [&get_segment_file_size_url,
                                         &segment_file_size](HttpClient* client) {
            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
            RETURN_IF_ERROR(client->head());
            return client->get_content_length(&segment_file_size);
        };

        status = _exec_http_req(client, max_retry, 1, get_segment_file_size_cb);
        if (!status.ok()) {
            LOG(WARNING) << "failed to get segment file size from " << get_segment_file_size_url
                         << ", status=" << status.to_string();
            status.to_thrift(&tstatus);
            return;
        }

        segment_file_sizes.push_back(segment_file_size);
        segment_file_urls.push_back(std::move(get_segment_file_size_url));
    }
    elapsed_time_map.emplace("get_segment_file_size", watch.elapsed_time_microseconds());

    // Step 5.2: check data capacity
    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), segment_file_sizes.end(),
                                          0ULL); // NOLINT(bugprone-fold-init-type)
    if (!local_tablet->can_add_binlog(total_size)) {
        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" << total_size
                     << ", tablet=" << local_tablet->tablet_id();
        status = Status::InternalError("no enough space");
        status.to_thrift(&tstatus);
        return;
    }
    total_download_bytes = total_size;
    total_download_files = num_segments;

    // Step 5.3: get all segment files
    for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
        auto segment_file_size = segment_file_sizes[segment_index];
        auto get_segment_file_url = segment_file_urls[segment_index];
        if (config::enable_download_md5sum_check) {
            get_segment_file_url = fmt::format("{}&acquire_md5=true", get_segment_file_url);
        }

        auto segment_path = local_segment_path(local_tablet->tablet_path(),
                                               rowset_meta->rowset_id().to_string(), segment_index);
        LOG(INFO) << "download segment file from " << get_segment_file_url << " to "
                  << segment_path;
        uint64_t estimate_timeout = estimate_download_timeout(segment_file_size);
        auto get_segment_file_cb = [&get_segment_file_url, &segment_path, segment_file_size,
                                    estimate_timeout, &download_success_files](HttpClient* client) {
            RETURN_IF_ERROR(client->init(get_segment_file_url));
            client->set_timeout_ms(estimate_timeout * 1000);
            RETURN_IF_ERROR(client->download(segment_path));
            download_success_files.push_back(segment_path);

            std::string remote_file_md5;
            RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
            LOG(INFO) << "download segment file to " << segment_path
                      << ", remote md5: " << remote_file_md5
                      << ", remote size: " << segment_file_size;

            std::error_code ec;
            // Check file length
            uint64_t local_file_size = std::filesystem::file_size(segment_path, ec);
            if (ec) {
                LOG(WARNING) << "download file error" << ec.message();
                return Status::IOError("can't retrive file_size of {}, due to {}", segment_path,
                                       ec.message());
            }

            if (local_file_size != segment_file_size) {
                LOG(WARNING) << "download file length error"
                             << ", get_segment_file_url=" << get_segment_file_url
                             << ", file_size=" << segment_file_size
                             << ", local_file_size=" << local_file_size;
                return Status::RuntimeError(
                        "downloaded file size is not equal, local={}, remote={}", local_file_size,
                        segment_file_size);
            }

            if (!remote_file_md5.empty()) { // keep compatibility
                std::string local_file_md5;
                RETURN_IF_ERROR(
                        io::global_local_filesystem()->md5sum(segment_path, &local_file_md5));
                if (local_file_md5 != remote_file_md5) {
                    LOG(WARNING) << "download file md5 error"
                                 << ", get_segment_file_url=" << get_segment_file_url
                                 << ", remote_file_md5=" << remote_file_md5
                                 << ", local_file_md5=" << local_file_md5;
                    return Status::RuntimeError(
                            "download file md5 is not equal, local={}, remote={}", local_file_md5,
                            remote_file_md5);
                }
            }

            return io::global_local_filesystem()->permission(segment_path,
                                                             io::LocalFileSystem::PERMS_OWNER_RW);
        };

        auto status = _exec_http_req(client, max_retry, 1, get_segment_file_cb);
        if (!status.ok()) {
            LOG(WARNING) << "failed to get segment file from " << get_segment_file_url
                         << ", status=" << status.to_string();
            status.to_thrift(&tstatus);
            return;
        }
    }
    elapsed_time_map.emplace("get_segment_files", watch.elapsed_time_microseconds());

    // Step 6: get all segment index files
    // Step 6.1: get all segment index files size
    std::vector<std::string> segment_index_file_urls;
    std::vector<uint64_t> segment_index_file_sizes;
    std::vector<std::string> segment_index_file_names;
    auto tablet_schema = rowset_meta->tablet_schema();
    if (tablet_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) {
        for (const auto& index : tablet_schema->inverted_indexes()) {
            auto index_id = index->index_id();
            for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
                auto get_segment_index_file_size_url = fmt::format(
                        "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
                        "}",
                        binlog_api_url, "get_segment_index_file", request.remote_tablet_id,
                        remote_rowset_id, segment_index, index_id);
                uint64_t segment_index_file_size;
                auto get_segment_index_file_size_cb =
                        [&get_segment_index_file_size_url,
                         &segment_index_file_size](HttpClient* client) {
                            RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
                            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
                            RETURN_IF_ERROR(client->head());
                            return client->get_content_length(&segment_index_file_size);
                        };

                auto segment_path =
                        local_segment_path(local_tablet->tablet_path(),
                                           rowset_meta->rowset_id().to_string(), segment_index);
                segment_index_file_names.push_back(InvertedIndexDescriptor::get_index_file_path_v1(
                        InvertedIndexDescriptor::get_index_file_path_prefix(segment_path), index_id,
                        index->get_index_suffix()));

                status = _exec_http_req(client, max_retry, 1, get_segment_index_file_size_cb);
                if (!status.ok()) {
                    LOG(WARNING) << "failed to get segment file size from "
                                 << get_segment_index_file_size_url
                                 << ", status=" << status.to_string();
                    status.to_thrift(&tstatus);
                    return;
                }

                segment_index_file_sizes.push_back(segment_index_file_size);
                segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
            }
        }
    } else {
        for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
            if (tablet_schema->has_inverted_index()) {
                auto get_segment_index_file_size_url = fmt::format(
                        "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
                        "}",
                        binlog_api_url, "get_segment_index_file", request.remote_tablet_id,
                        remote_rowset_id, segment_index, -1);
                uint64_t segment_index_file_size;
                auto get_segment_index_file_size_cb =
                        [&get_segment_index_file_size_url,
                         &segment_index_file_size](HttpClient* client) {
                            RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
                            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
                            RETURN_IF_ERROR(client->head());
                            return client->get_content_length(&segment_index_file_size);
                        };
                auto segment_path =
                        local_segment_path(local_tablet->tablet_path(),
                                           rowset_meta->rowset_id().to_string(), segment_index);
                segment_index_file_names.push_back(InvertedIndexDescriptor::get_index_file_path_v2(
                        InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)));

                status = _exec_http_req(client, max_retry, 1, get_segment_index_file_size_cb);
                if (!status.ok()) {
                    LOG(WARNING) << "failed to get segment file size from "
                                 << get_segment_index_file_size_url
                                 << ", status=" << status.to_string();
                    status.to_thrift(&tstatus);
                    return;
                }

                segment_index_file_sizes.push_back(segment_index_file_size);
                segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
            }
        }
    }
    elapsed_time_map.emplace("get_segment_index_file_size", watch.elapsed_time_microseconds());

    // Step 6.2: check data capacity
    uint64_t total_index_size =
            std::accumulate(segment_index_file_sizes.begin(), segment_index_file_sizes.end(),
                            0ULL); // NOLINT(bugprone-fold-init-type)
    if (!local_tablet->can_add_binlog(total_index_size)) {
        LOG(WARNING) << "failed to add binlog, no enough space, total_index_size="
                     << total_index_size << ", tablet=" << local_tablet->tablet_id();
        status = Status::InternalError("no enough space");
        status.to_thrift(&tstatus);
        return;
    }
    total_download_bytes += total_index_size;
    total_download_files += segment_index_file_urls.size();

    // Step 6.3: get all segment index files
    DCHECK(segment_index_file_sizes.size() == segment_index_file_names.size());
    DCHECK(segment_index_file_names.size() == segment_index_file_urls.size());
    for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) {
        auto segment_index_file_size = segment_index_file_sizes[i];
        auto get_segment_index_file_url = segment_index_file_urls[i];
        if (config::enable_download_md5sum_check) {
            get_segment_index_file_url =
                    fmt::format("{}&acquire_md5=true", get_segment_index_file_url);
        }

        uint64_t estimate_timeout = estimate_download_timeout(segment_index_file_size);
        auto local_segment_index_path = segment_index_file_names[i];
        LOG(INFO) << fmt::format("download segment index file from {} to {}",
                                 get_segment_index_file_url, local_segment_index_path);
        auto get_segment_index_file_cb = [&get_segment_index_file_url, &local_segment_index_path,
                                          segment_index_file_size, estimate_timeout,
                                          &download_success_files](HttpClient* client) {
            RETURN_IF_ERROR(client->init(get_segment_index_file_url));
            client->set_timeout_ms(estimate_timeout * 1000);
            RETURN_IF_ERROR(client->download(local_segment_index_path));
            download_success_files.push_back(local_segment_index_path);

            std::string remote_file_md5;
            RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));

            LOG(INFO) << "download segment index file to " << local_segment_index_path
                      << ", remote md5: " << remote_file_md5
                      << ", remote size: " << segment_index_file_size;

            std::error_code ec;
            // Check file length
            uint64_t local_index_file_size =
                    std::filesystem::file_size(local_segment_index_path, ec);
            if (ec) {
                LOG(WARNING) << "download index file error" << ec.message();
                return Status::IOError("can't retrive file_size of {}, due to {}",
                                       local_segment_index_path, ec.message());
            }
            if (local_index_file_size != segment_index_file_size) {
                LOG(WARNING) << "download index file length error"
                             << ", get_segment_index_file_url=" << get_segment_index_file_url
                             << ", index_file_size=" << segment_index_file_size
                             << ", local_index_file_size=" << local_index_file_size;
                return Status::RuntimeError(
                        "downloaded index file size is not equal, local={}, remote={}",
                        local_index_file_size, segment_index_file_size);
            }

            if (!remote_file_md5.empty()) { // keep compatibility
                std::string local_file_md5;
                RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_segment_index_path,
                                                                      &local_file_md5));
                if (local_file_md5 != remote_file_md5) {
                    LOG(WARNING) << "download file md5 error"
                                 << ", get_segment_index_file_url=" << get_segment_index_file_url
                                 << ", remote_file_md5=" << remote_file_md5
                                 << ", local_file_md5=" << local_file_md5;
                    return Status::RuntimeError(
                            "download file md5 is not equal, local={}, remote={}", local_file_md5,
                            remote_file_md5);
                }
            }

            return io::global_local_filesystem()->permission(local_segment_index_path,
                                                             io::LocalFileSystem::PERMS_OWNER_RW);
        };

        status = _exec_http_req(client, max_retry, 1, get_segment_index_file_cb);
        if (!status.ok()) {
            LOG(WARNING) << "failed to get segment index file from " << get_segment_index_file_url
                         << ", status=" << status.to_string();
            status.to_thrift(&tstatus);
            return;
        }
    }
    elapsed_time_map.emplace("get_segment_index_files", watch.elapsed_time_microseconds());

    // Step 7: create rowset && calculate delete bitmap && commit
    // Step 7.1: create rowset
    RowsetSharedPtr rowset;
    status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
                                          local_tablet->tablet_path(), rowset_meta, &rowset);
    if (!status) {
        LOG(WARNING) << "failed to create rowset from rowset meta for remote tablet"
                     << ". rowset_id: " << rowset_meta_pb.rowset_id()
                     << ", rowset_type: " << rowset_meta_pb.rowset_type()
                     << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << ", txn_id=" << txn_id
                     << ", status=" << status.to_string();
        status.to_thrift(&tstatus);
        return;
    }

    // Step 7.2 calculate delete bitmap before commit
    auto calc_delete_bitmap_token = engine.calc_delete_bitmap_executor()->create_token();
    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(local_tablet_id);
    RowsetIdUnorderedSet pre_rowset_ids;
    if (local_tablet->enable_unique_key_merge_on_write()) {
        auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
        std::vector<segment_v2::SegmentSharedPtr> segments;
        status = beta_rowset->load_segments(&segments);
        if (!status) {
            LOG(WARNING) << "failed to load segments from rowset"
                         << ". rowset_id: " << beta_rowset->rowset_id() << ", txn_id=" << txn_id
                         << ", status=" << status.to_string();
            status.to_thrift(&tstatus);
            return;
        }
        elapsed_time_map.emplace("load_segments", watch.elapsed_time_microseconds());
        if (segments.size() > 1) {
            // calculate delete bitmap between segments
            status = local_tablet->calc_delete_bitmap_between_segments(rowset->rowset_id(),
                                                                       segments, delete_bitmap);
            if (!status) {
                LOG(WARNING) << "failed to calculate delete bitmap"
                             << ". tablet_id: " << local_tablet->tablet_id()
                             << ". rowset_id: " << rowset->rowset_id() << ", txn_id=" << txn_id
                             << ", status=" << status.to_string();
                status.to_thrift(&tstatus);
                return;
            }
            elapsed_time_map.emplace("calc_delete_bitmap", watch.elapsed_time_microseconds());
        }

        static_cast<void>(BaseTablet::commit_phase_update_delete_bitmap(
                local_tablet, rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
                calc_delete_bitmap_token.get(), nullptr));
        elapsed_time_map.emplace("commit_phase_update_delete_bitmap",
                                 watch.elapsed_time_microseconds());
        static_cast<void>(calc_delete_bitmap_token->wait());
        elapsed_time_map.emplace("wait_delete_bitmap", watch.elapsed_time_microseconds());
    }

    // Step 7.3: commit txn
    Status commit_txn_status = engine.txn_manager()->commit_txn(
            local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
            rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(),
            rowset_meta->load_id(), rowset, std::move(pending_rs_guard), false);
    if (!commit_txn_status && !commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
        auto err_msg = fmt::format(
                "failed to commit txn for remote tablet. rowset_id: {}, remote_tablet_id={}, "
                "txn_id={}, status={}",
                rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(),
                rowset_meta->txn_id(), commit_txn_status.to_string());
        LOG(WARNING) << err_msg;
        set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg));
        return;
    }
    elapsed_time_map.emplace("commit_txn", watch.elapsed_time_microseconds());

    if (local_tablet->enable_unique_key_merge_on_write()) {
        engine.txn_manager()->set_txn_related_delete_bitmap(partition_id, txn_id, local_tablet_id,
                                                            local_tablet->tablet_uid(), true,
                                                            delete_bitmap, pre_rowset_ids, nullptr);
        elapsed_time_map.emplace("set_txn_related_delete_bitmap",
                                 watch.elapsed_time_microseconds());
    }

    tstatus.__set_status_code(TStatusCode::OK);
}