void process_compaction_job()

in cloud/src/meta-service/meta_service_job.cpp [683:1096]


void process_compaction_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
                            std::unique_ptr<Transaction>& txn,
                            const FinishTabletJobRequest* request,
                            FinishTabletJobResponse* response, TabletJobInfoPB& recorded_job,
                            std::string& instance_id, std::string& job_key, bool& need_commit) {
    //==========================================================================
    //                                check
    //==========================================================================
    int64_t table_id = request->job().idx().table_id();
    int64_t index_id = request->job().idx().index_id();
    int64_t partition_id = request->job().idx().partition_id();
    int64_t tablet_id = request->job().idx().tablet_id();
    if (recorded_job.compaction().empty()) {
        SS << "there is no running compaction, tablet_id=" << tablet_id;
        msg = ss.str();
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }

    auto& compaction = request->job().compaction(0);

    auto recorded_compaction = recorded_job.mutable_compaction()->begin();
    for (; recorded_compaction != recorded_job.mutable_compaction()->end(); ++recorded_compaction) {
        if (recorded_compaction->id() == compaction.id()) break;
    }
    if (recorded_compaction == recorded_job.mutable_compaction()->end()) {
        SS << "unmatched job id, recorded_job=" << proto_to_json(recorded_job)
           << " given_job=" << proto_to_json(compaction);
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = ss.str();
        return;
    }

    using namespace std::chrono;
    int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
    if (recorded_compaction->expiration() > 0 && recorded_compaction->expiration() < now) {
        code = MetaServiceCode::JOB_EXPIRED;
        SS << "expired compaction job, tablet_id=" << tablet_id
           << " job=" << proto_to_json(*recorded_compaction);
        msg = ss.str();
        // FIXME: Just remove or notify to abort?
        // LOG(INFO) << "remove expired job, tablet_id=" << tablet_id << " key=" << hex(job_key);
        return;
    }

    if (request->action() != FinishTabletJobRequest::COMMIT &&
        request->action() != FinishTabletJobRequest::ABORT &&
        request->action() != FinishTabletJobRequest::LEASE) {
        SS << "unsupported action, tablet_id=" << tablet_id << " action=" << request->action();
        msg = ss.str();
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }

    bool abort_compaction = false;
    if (request->action() == FinishTabletJobRequest::COMMIT &&
        !check_compaction_input_verions(compaction, recorded_job, ss)) {
        msg = ss.str();
        INSTANCE_LOG(INFO) << msg;
        abort_compaction = true;
        response->set_alter_version(recorded_job.schema_change().alter_version());
        code = MetaServiceCode::JOB_CHECK_ALTER_VERSION;
    }

    //==========================================================================
    //                               Abort
    //==========================================================================
    if (request->action() == FinishTabletJobRequest::ABORT || abort_compaction) {
        // TODO(gavin): mv tmp rowsets to recycle or remove them directly
        recorded_job.mutable_compaction()->erase(recorded_compaction);
        auto job_val = recorded_job.SerializeAsString();
        txn->put(job_key, job_val);
        INSTANCE_LOG(INFO) << "abort tablet compaction job, tablet_id=" << tablet_id
                           << " key=" << hex(job_key);
        if (compaction.has_delete_bitmap_lock_initiator()) {
            std::string use_version = config::use_delete_bitmap_lock_version;
            if (config::use_delete_bitmap_lock_random_version && !use_new_version_random()) {
                use_version = "v1";
            }
            remove_delete_bitmap_update_lock(
                    txn, instance_id, table_id, tablet_id, COMPACTION_DELETE_BITMAP_LOCK_ID,
                    compaction.delete_bitmap_lock_initiator(), use_version);
        }
        need_commit = true;
        return;
    }

    //==========================================================================
    //                               Lease
    //==========================================================================
    if (request->action() == FinishTabletJobRequest::LEASE) {
        if (compaction.lease() <= 0 || recorded_compaction->lease() > compaction.lease()) {
            ss << "invalid lease. recoreded_lease=" << recorded_compaction->lease()
               << " req_lease=" << compaction.lease();
            msg = ss.str();
            code = MetaServiceCode::INVALID_ARGUMENT;
            return;
        }
        recorded_compaction->set_lease(compaction.lease());
        auto job_val = recorded_job.SerializeAsString();
        txn->put(job_key, job_val);
        INSTANCE_LOG(INFO) << "lease tablet compaction job, tablet_id=" << tablet_id
                           << " key=" << hex(job_key);
        need_commit = true;
        return;
    }

    //==========================================================================
    //                               Commit
    //==========================================================================
    //
    // 1. update tablet stats
    // 2. move compaction input rowsets to recycle
    // 3. change tmp rowset to formal rowset
    // 4. remove compaction job
    //
    //==========================================================================
    //                          Update tablet stats
    //==========================================================================
    auto stats = response->mutable_stats();
    TabletStats detached_stats;
    // ATTN: The condition that snapshot read can be used to get tablet stats is: all other transactions that put tablet stats
    //  can make read write conflicts with this transaction on other keys. Currently, if all meta-service nodes are running
    //  with `config::split_tablet_stats = true` can meet the condition.
    internal_get_tablet_stats(code, msg, txn.get(), instance_id, request->job().idx(), *stats,
                              detached_stats, config::snapshot_get_tablet_stats);
    if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
        stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
        stats->set_cumulative_point(compaction.output_cumulative_point());
        stats->set_last_cumu_compaction_time_ms(now * 1000);
    } else if (compaction.type() == TabletCompactionJobPB::CUMULATIVE) {
        // clang-format off
        stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
        if (compaction.output_cumulative_point() > stats->cumulative_point()) {
            // After supporting parallel cumu compaction, compaction with older cumu point may be committed after
            // new cumu point has been set, MUST NOT set cumu point back to old value
            stats->set_cumulative_point(compaction.output_cumulative_point());
        }
        stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() - compaction.num_input_rows()));
        stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
        stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
        stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
        stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
        stats->set_segment_size(stats->segment_size() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
        stats->set_last_cumu_compaction_time_ms(now * 1000);
        // clang-format on
    } else if (compaction.type() == TabletCompactionJobPB::BASE) {
        // clang-format off
        stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
        stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() - compaction.num_input_rows()));
        stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
        stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
        stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
        stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
        stats->set_segment_size(stats->segment_size() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
        stats->set_last_base_compaction_time_ms(now * 1000);
        // clang-format on
    } else if (compaction.type() == TabletCompactionJobPB::FULL) {
        // clang-format off
        stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
        if (compaction.output_cumulative_point() > stats->cumulative_point()) {
            // After supporting parallel cumu compaction, compaction with older cumu point may be committed after
            // new cumu point has been set, MUST NOT set cumu point back to old value
            stats->set_cumulative_point(compaction.output_cumulative_point());
        }
        stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() - compaction.num_input_rows()));
        stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
        stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
        stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
        stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
        stats->set_segment_size(stats->segment_size() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
        stats->set_last_full_compaction_time_ms(now * 1000);
        // clang-format on
    } else {
        msg = "invalid compaction type";
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }
    auto stats_key = stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
    auto stats_val = stats->SerializeAsString();

    VLOG_DEBUG << "data size, tablet_id=" << tablet_id << " stats.num_rows=" << stats->num_rows()
               << " stats.data_size=" << stats->data_size()
               << " stats.num_rowsets=" << stats->num_rowsets()
               << " stats.num_segments=" << stats->num_segments()
               << " stats.index_size=" << stats->index_size()
               << " stats.segment_size=" << stats->segment_size()
               << " detached_stats.num_rows=" << detached_stats.num_rows
               << " detached_stats.data_size=" << detached_stats.data_size
               << " detached_stats.num_rowset=" << detached_stats.num_rowsets
               << " detached_stats.num_segments=" << detached_stats.num_segs
               << " detached_stats.index_size=" << detached_stats.index_size
               << " detached_stats.segment_size=" << detached_stats.segment_size
               << " compaction.size_output_rowsets=" << compaction.size_output_rowsets()
               << " compaction.size_input_rowsets=" << compaction.size_input_rowsets();
    txn->put(stats_key, stats_val);
    merge_tablet_stats(*stats, detached_stats); // this is to check
    if (stats->data_size() < 0 || stats->num_rowsets() < 1) [[unlikely]] {
        INSTANCE_LOG(ERROR) << "buggy data size, tablet_id=" << tablet_id
                            << " stats.num_rows=" << stats->num_rows()
                            << " stats.data_size=" << stats->data_size()
                            << " stats.num_rowsets=" << stats->num_rowsets()
                            << " stats.num_segments=" << stats->num_segments()
                            << " stats.index_size=" << stats->index_size()
                            << " stats.segment_size=" << stats->segment_size()
                            << " detached_stats.num_rows=" << detached_stats.num_rows
                            << " detached_stats.data_size=" << detached_stats.data_size
                            << " detached_stats.num_rowset=" << detached_stats.num_rowsets
                            << " detached_stats.num_segments=" << detached_stats.num_segs
                            << " detached_stats.index_size=" << detached_stats.index_size
                            << " detached_stats.segment_size=" << detached_stats.segment_size
                            << " compaction.size_output_rowsets="
                            << compaction.size_output_rowsets()
                            << " compaction.size_input_rowsets=" << compaction.size_input_rowsets();
        DCHECK(false) << "buggy data size, tablet_id=" << tablet_id;
    }

    VLOG_DEBUG << "update tablet stats tablet_id=" << tablet_id << " key=" << hex(stats_key)
               << " stats=" << proto_to_json(*stats);
    if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
        recorded_job.mutable_compaction()->erase(recorded_compaction);
        auto job_val = recorded_job.SerializeAsString();
        txn->put(job_key, job_val);
        INSTANCE_LOG(INFO) << "remove compaction job, tablet_id=" << tablet_id
                           << " key=" << hex(job_key);
        need_commit = true;
        return;
    }

    // remove delete bitmap update lock for MoW table
    if (compaction.has_delete_bitmap_lock_initiator()) {
        std::string use_version = config::use_delete_bitmap_lock_version;
        if (config::use_delete_bitmap_lock_random_version && !use_new_version_random()) {
            use_version = "v1";
        }
        bool success = check_and_remove_delete_bitmap_update_lock(
                code, msg, ss, txn, instance_id, table_id, tablet_id,
                COMPACTION_DELETE_BITMAP_LOCK_ID, compaction.delete_bitmap_lock_initiator(),
                use_version);
        if (!success) {
            return;
        }
    }

    //==========================================================================
    //                    Move input rowsets to recycle
    //==========================================================================
    if (compaction.input_versions_size() != 2 || compaction.output_versions_size() != 1 ||
        compaction.output_rowset_ids_size() != 1) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        SS << "invalid input or output versions, input_versions_size="
           << compaction.input_versions_size()
           << " output_versions_size=" << compaction.output_versions_size()
           << " output_rowset_ids_size=" << compaction.output_rowset_ids_size();
        msg = ss.str();
        return;
    }

    auto start = compaction.input_versions(0);
    auto end = compaction.input_versions(1);
    auto rs_start = meta_rowset_key({instance_id, tablet_id, start});
    auto rs_end = meta_rowset_key({instance_id, tablet_id, end + 1});

    std::unique_ptr<RangeGetIterator> it;
    int num_rowsets = 0;
    std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
            (int*)0x01, [&rs_start, &rs_end, &num_rowsets, &instance_id](int*) {
                INSTANCE_LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets << " range=["
                                   << hex(rs_start) << "," << hex(rs_end) << "]";
            });

    auto rs_start1 = rs_start;
    do {
        TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            SS << "internal error, failed to get rowset range, err=" << err
               << " tablet_id=" << tablet_id << " range=[" << hex(rs_start1) << ", << "
               << hex(rs_end) << ")";
            msg = ss.str();
            return;
        }

        while (it->has_next()) {
            auto [k, v] = it->next();

            doris::RowsetMetaCloudPB rs;
            if (!rs.ParseFromArray(v.data(), v.size())) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                SS << "malformed rowset meta, unable to deserialize, tablet_id=" << tablet_id
                   << " key=" << hex(k);
                msg = ss.str();
                return;
            }

            // remove delete bitmap of input rowset for MoW table
            if (compaction.has_delete_bitmap_lock_initiator()) {
                auto delete_bitmap_start =
                        meta_delete_bitmap_key({instance_id, tablet_id, rs.rowset_id_v2(), 0, 0});
                auto delete_bitmap_end = meta_delete_bitmap_key(
                        {instance_id, tablet_id, rs.rowset_id_v2(), INT64_MAX, INT64_MAX});
                txn->remove(delete_bitmap_start, delete_bitmap_end);
            }

            auto recycle_key = recycle_rowset_key({instance_id, tablet_id, rs.rowset_id_v2()});
            RecycleRowsetPB recycle_rowset;
            recycle_rowset.set_creation_time(now);
            recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
            recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
            auto recycle_val = recycle_rowset.SerializeAsString();
            txn->put(recycle_key, recycle_val);
            INSTANCE_LOG(INFO) << "put recycle rowset, tablet_id=" << tablet_id
                               << " key=" << hex(recycle_key);

            ++num_rowsets;
            if (!it->has_next()) rs_start1 = k;
        }
        rs_start1.push_back('\x00'); // Update to next smallest key for iteration
    } while (it->more());

    txn->remove(rs_start, rs_end);

    LOG_INFO("cloud process compaction job txn remove meta rowset key")
            .tag("instance_id", instance_id)
            .tag("tablet_id", tablet_id)
            .tag("start_version", start)
            .tag("end_version", end + 1)
            .tag("rs_start key", hex(rs_start))
            .tag("rs_end key", hex(rs_end));

    TEST_SYNC_POINT_CALLBACK("process_compaction_job::loop_input_done", &num_rowsets);

    if (num_rowsets < 1) {
        SS << "too few input rowsets, tablet_id=" << tablet_id << " num_rowsets=" << num_rowsets;
        code = MetaServiceCode::UNDEFINED_ERR;
        msg = ss.str();
        recorded_job.mutable_compaction()->erase(recorded_compaction);
        auto job_val = recorded_job.SerializeAsString();
        txn->put(job_key, job_val);
        INSTANCE_LOG(INFO) << "remove compaction job, tablet_id=" << tablet_id
                           << " key=" << hex(job_key);
        need_commit = true;
        TEST_SYNC_POINT_CALLBACK("process_compaction_job::too_few_rowsets", &need_commit);
        return;
    }

    //==========================================================================
    //                Change tmp rowset to formal rowset
    //==========================================================================
    if (compaction.txn_id_size() != 1) {
        SS << "invalid txn_id, txn_id_size=" << compaction.txn_id_size();
        msg = ss.str();
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }
    int64_t txn_id = compaction.txn_id(0);
    auto& rowset_id = compaction.output_rowset_ids(0);
    if (txn_id <= 0 || rowset_id.empty()) {
        SS << "invalid txn_id or rowset_id, tablet_id=" << tablet_id << " txn_id=" << txn_id
           << " rowset_id=" << rowset_id;
        msg = ss.str();
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }
    auto tmp_rowset_key = meta_rowset_tmp_key({instance_id, txn_id, tablet_id});
    std::string tmp_rowset_val;
    TxnErrorCode err = txn->get(tmp_rowset_key, &tmp_rowset_val);
    if (err != TxnErrorCode::TXN_OK) {
        SS << "failed to get tmp rowset key"
           << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
           << ", tablet_id=" << tablet_id << " tmp_rowset_key=" << hex(tmp_rowset_key)
           << ", err=" << err;
        msg = ss.str();
        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::UNDEFINED_ERR
                                                      : cast_as<ErrCategory::READ>(err);
        return;
    }

    // We don't actually need to parse the rowset meta
    doris::RowsetMetaCloudPB rs_meta;
    rs_meta.ParseFromString(tmp_rowset_val);
    if (rs_meta.txn_id() <= 0) {
        SS << "invalid txn_id in output tmp rowset meta, tablet_id=" << tablet_id
           << " txn_id=" << rs_meta.txn_id();
        msg = ss.str();
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }

    txn->remove(tmp_rowset_key);
    INSTANCE_LOG(INFO) << "remove tmp rowset meta, tablet_id=" << tablet_id
                       << " tmp_rowset_key=" << hex(tmp_rowset_key);

    int64_t version = compaction.output_versions(0);
    auto rowset_key = meta_rowset_key({instance_id, tablet_id, version});
    txn->put(rowset_key, tmp_rowset_val);
    INSTANCE_LOG(INFO) << "put rowset meta, tablet_id=" << tablet_id
                       << " rowset_key=" << hex(rowset_key);

    //==========================================================================
    //                      Remove compaction job
    //==========================================================================
    // TODO(gavin): move deleted job info into recycle or history
    recorded_job.mutable_compaction()->erase(recorded_compaction);
    auto job_val = recorded_job.SerializeAsString();
    txn->put(job_key, job_val);
    INSTANCE_LOG(INFO) << "remove compaction job tabelt_id=" << tablet_id
                       << " key=" << hex(job_key);
    response->set_alter_version(recorded_job.has_schema_change() &&
                                                recorded_job.schema_change().has_alter_version()
                                        ? recorded_job.schema_change().alter_version()
                                        : -1);
    need_commit = true;
}