void process_schema_change_job()

in cloud/src/meta-service/meta_service_job.cpp [1098:1483]


void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
                               std::unique_ptr<Transaction>& txn,
                               const FinishTabletJobRequest* request,
                               FinishTabletJobResponse* response, TabletJobInfoPB& recorded_job,
                               std::string& instance_id, std::string& job_key, bool& need_commit) {
    //==========================================================================
    //                                check
    //==========================================================================
    int64_t tablet_id = request->job().idx().tablet_id();
    auto& schema_change = request->job().schema_change();
    int64_t new_tablet_id = schema_change.new_tablet_idx().tablet_id();
    if (new_tablet_id <= 0) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "no valid new_tablet_id given";
        return;
    }
    if (new_tablet_id == tablet_id) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "not allow new_tablet_id same with base_tablet_id";
        return;
    }
    auto& new_tablet_idx = const_cast<TabletIndexPB&>(schema_change.new_tablet_idx());
    if (!new_tablet_idx.has_table_id() || !new_tablet_idx.has_index_id() ||
        !new_tablet_idx.has_partition_id()) {
        get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id, new_tablet_idx);
        if (code != MetaServiceCode::OK) return;
    }
    int64_t new_table_id = new_tablet_idx.table_id();
    int64_t new_index_id = new_tablet_idx.index_id();
    int64_t new_partition_id = new_tablet_idx.partition_id();

    auto new_tablet_key = meta_tablet_key(
            {instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
    std::string new_tablet_val;
    doris::TabletMetaCloudPB new_tablet_meta;
    TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
    if (err != TxnErrorCode::TXN_OK) {
        SS << "failed to get new tablet meta"
           << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
           << " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
           << " key=" << hex(new_tablet_key) << " err=" << err;
        msg = ss.str();
        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
                                                      : cast_as<ErrCategory::READ>(err);
        return;
    }
    if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "malformed tablet meta";
        return;
    }

    if (new_tablet_meta.tablet_state() == doris::TabletStatePB::PB_RUNNING) {
        code = MetaServiceCode::JOB_ALREADY_SUCCESS;
        msg = "schema_change job already success";
        return;
    }
    if (!new_tablet_meta.has_tablet_state() ||
        new_tablet_meta.tablet_state() != doris::TabletStatePB::PB_NOTREADY) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "invalid new tablet state";
        return;
    }

    if (!recorded_job.has_schema_change()) {
        SS << "there is no running schema_change, tablet_id=" << tablet_id;
        msg = ss.str();
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }
    auto& recorded_schema_change = recorded_job.schema_change();
    using namespace std::chrono;
    int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
    if (recorded_schema_change.expiration() > 0 && recorded_schema_change.expiration() < now) {
        code = MetaServiceCode::JOB_EXPIRED;
        SS << "expired schema_change job, tablet_id=" << tablet_id
           << " job=" << proto_to_json(recorded_schema_change);
        msg = ss.str();
        // FIXME: Just remove or notify to abort?
        // LOG(INFO) << "remove expired job, tablet_id=" << tablet_id << " key=" << hex(job_key);
        return;
    }

    // MUST check initiator to let the retried BE commit this schema_change job.
    if (schema_change.id() != recorded_schema_change.id() ||
        schema_change.initiator() != recorded_schema_change.initiator()) {
        SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id()
           << " given_id=" << schema_change.id()
           << " recorded_job=" << proto_to_json(recorded_schema_change)
           << " given_job=" << proto_to_json(schema_change);
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = ss.str();
        return;
    }

    if (request->action() != FinishTabletJobRequest::COMMIT &&
        request->action() != FinishTabletJobRequest::ABORT) {
        SS << "unsupported action, tablet_id=" << tablet_id << " action=" << request->action();
        msg = ss.str();
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }

    auto new_tablet_job_key = job_tablet_key(
            {instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});

    std::string new_tablet_job_val;
    TabletJobInfoPB new_recorded_job;
    err = txn->get(new_tablet_job_key, &new_tablet_job_val);
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
        SS << "internal error,"
           << " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
           << " job=" << proto_to_json(request->job()) << " err=" << err;
        msg = ss.str();
        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
                                                      : cast_as<ErrCategory::READ>(err);
        return;
    } else if (err == TxnErrorCode::TXN_OK) {
        if (!new_recorded_job.ParseFromString(new_tablet_job_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            msg = "malformed new tablet recorded job";
            return;
        }
    }

    //==========================================================================
    //                               Abort
    //==========================================================================
    if (request->action() == FinishTabletJobRequest::ABORT) {
        if (schema_change.new_tablet_idx().index_id() ==
                    recorded_schema_change.new_tablet_idx().index_id() &&
            schema_change.new_tablet_idx().tablet_id() ==
                    recorded_schema_change.new_tablet_idx().tablet_id()) {
            // remove schema change
            recorded_job.clear_schema_change();
            auto job_val = recorded_job.SerializeAsString();
            txn->put(job_key, job_val);
            if (!new_tablet_job_val.empty()) {
                auto& compactions = *new_recorded_job.mutable_compaction();
                auto origin_size = compactions.size();
                compactions.erase(
                        std::remove_if(
                                compactions.begin(), compactions.end(),
                                [&](auto& c) {
                                    return c.has_delete_bitmap_lock_initiator() &&
                                           c.delete_bitmap_lock_initiator() ==
                                                   schema_change.delete_bitmap_lock_initiator();
                                }),
                        compactions.end());
                if (compactions.size() < origin_size) {
                    INSTANCE_LOG(INFO)
                            << "remove " << (origin_size - compactions.size())
                            << " STOP_TOKEN for schema_change job tablet_id=" << tablet_id
                            << " delete_bitmap_lock_initiator="
                            << schema_change.delete_bitmap_lock_initiator()
                            << " key=" << hex(job_key);
                }
                new_recorded_job.clear_schema_change();
                new_tablet_job_val = new_recorded_job.SerializeAsString();
                txn->put(new_tablet_job_key, new_tablet_job_val);
            }
            INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
                               << " key=" << hex(job_key);

            need_commit = true;
        }
        return;
    }

    //==========================================================================
    //                               Commit
    //==========================================================================
    //
    // 1. update new_tablet meta
    // 2. move rowsets [2-alter_version] in new_tablet to recycle
    // 3. update new_tablet stats
    // 4. change tmp rowset to formal rowset
    // 5. remove schema_change job
    //
    //==========================================================================
    //                          update tablet meta
    //==========================================================================
    new_tablet_meta.set_tablet_state(doris::TabletStatePB::PB_RUNNING);
    new_tablet_meta.set_cumulative_layer_point(schema_change.output_cumulative_point());
    new_tablet_meta.SerializeToString(&new_tablet_val);
    txn->put(new_tablet_key, new_tablet_val);

    //==========================================================================
    //                move rowsets [2-alter_version] to recycle
    //==========================================================================
    if (!schema_change.has_alter_version()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "invalid alter_version";
        return;
    }
    if (schema_change.alter_version() < 2) {
        // no need to update stats
        if (!new_tablet_job_val.empty()) {
            new_recorded_job.clear_schema_change();
            auto& compactions = *new_recorded_job.mutable_compaction();
            auto origin_size = compactions.size();
            compactions.erase(
                    std::remove_if(compactions.begin(), compactions.end(),
                                   [&](auto& c) {
                                       return c.has_delete_bitmap_lock_initiator() &&
                                              c.delete_bitmap_lock_initiator() ==
                                                      schema_change.delete_bitmap_lock_initiator();
                                   }),
                    compactions.end());
            if (compactions.size() < origin_size) {
                INSTANCE_LOG(INFO)
                        << "remove " << (origin_size - compactions.size())
                        << " STOP_TOKEN for schema_change job tablet_id=" << tablet_id
                        << " delete_bitmap_lock_initiator="
                        << schema_change.delete_bitmap_lock_initiator() << " key=" << hex(job_key);
            }
            new_tablet_job_val = new_recorded_job.SerializeAsString();
            txn->put(new_tablet_job_key, new_tablet_job_val);
        }
        need_commit = true;
        return;
    }

    int64_t num_remove_rows = 0;
    int64_t size_remove_rowsets = 0;
    int64_t num_remove_rowsets = 0;
    int64_t num_remove_segments = 0;
    int64_t index_size_remove_rowsets = 0;
    int64_t segment_size_remove_rowsets = 0;

    auto rs_start = meta_rowset_key({instance_id, new_tablet_id, 2});
    auto rs_end = meta_rowset_key({instance_id, new_tablet_id, schema_change.alter_version() + 1});
    std::unique_ptr<RangeGetIterator> it;
    auto rs_start1 = rs_start;
    do {
        TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
        if (err != TxnErrorCode::TXN_OK) {
            code = MetaServiceCode::KV_TXN_GET_ERR;
            SS << "internal error, failed to get rowset range, err=" << err
               << " tablet_id=" << new_tablet_id << " range=[" << hex(rs_start1) << ", << "
               << hex(rs_end) << ")";
            msg = ss.str();
            return;
        }

        while (it->has_next()) {
            auto [k, v] = it->next();

            doris::RowsetMetaCloudPB rs;
            if (!rs.ParseFromArray(v.data(), v.size())) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                SS << "malformed rowset meta, unable to deserialize, tablet_id=" << new_tablet_id
                   << " key=" << hex(k);
                msg = ss.str();
                return;
            }

            num_remove_rows += rs.num_rows();
            size_remove_rowsets += rs.total_disk_size();
            ++num_remove_rowsets;
            num_remove_segments += rs.num_segments();
            index_size_remove_rowsets += rs.index_disk_size();
            segment_size_remove_rowsets += rs.data_disk_size();

            auto recycle_key = recycle_rowset_key({instance_id, new_tablet_id, rs.rowset_id_v2()});
            RecycleRowsetPB recycle_rowset;
            recycle_rowset.set_creation_time(now);
            recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
            recycle_rowset.set_type(RecycleRowsetPB::DROP);
            auto recycle_val = recycle_rowset.SerializeAsString();
            txn->put(recycle_key, recycle_val);
            INSTANCE_LOG(INFO) << "put recycle rowset, tablet_id=" << new_tablet_id
                               << " key=" << hex(recycle_key);

            if (!it->has_next()) rs_start1 = k;
        }
        rs_start1.push_back('\x00'); // Update to next smallest key for iteration
    } while (it->more());

    txn->remove(rs_start, rs_end);

    //==========================================================================
    //                        update new_tablet stats
    //==========================================================================
    auto stats = response->mutable_stats();
    TabletStats detached_stats;
    // ATTN: The condition that snapshot read can be used to get tablet stats is: all other transactions that put tablet stats
    //  can make read write conflicts with this transaction on other keys. Currently, if all meta-service nodes are running
    //  with `config::split_tablet_stats = true` can meet the condition.
    internal_get_tablet_stats(code, msg, txn.get(), instance_id, new_tablet_idx, *stats,
                              detached_stats, config::snapshot_get_tablet_stats);
    // clang-format off
    stats->set_cumulative_point(schema_change.output_cumulative_point());
    stats->set_num_rows(stats->num_rows() + (schema_change.num_output_rows() - num_remove_rows));
    stats->set_data_size(stats->data_size() + (schema_change.size_output_rowsets() - size_remove_rowsets));
    stats->set_num_rowsets(stats->num_rowsets() + (schema_change.num_output_rowsets() - num_remove_rowsets));
    stats->set_num_segments(stats->num_segments() + (schema_change.num_output_segments() - num_remove_segments));
    stats->set_index_size(stats->index_size() + (schema_change.index_size_output_rowsets() - index_size_remove_rowsets));
    stats->set_segment_size(stats->segment_size() + (schema_change.segment_size_output_rowsets() - segment_size_remove_rowsets));
    // clang-format on
    auto stats_key = stats_tablet_key(
            {instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
    auto stats_val = stats->SerializeAsString();
    txn->put(stats_key, stats_val);
    merge_tablet_stats(*stats, detached_stats);
    VLOG_DEBUG << "update tablet stats tablet_id=" << tablet_id << " key=" << hex(stats_key)
               << " stats=" << proto_to_json(*stats);
    //==========================================================================
    //                  change tmp rowset to formal rowset
    //==========================================================================
    if (schema_change.txn_ids().empty() || schema_change.output_versions().empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty txn_ids or output_versions";
        return;
    }

    // process mow table, check lock
    if (new_tablet_meta.enable_unique_key_merge_on_write()) {
        std::string use_version = config::use_delete_bitmap_lock_version;
        if (config::use_delete_bitmap_lock_random_version && !use_new_version_random()) {
            use_version = "v1";
        }
        bool success = check_and_remove_delete_bitmap_update_lock(
                code, msg, ss, txn, instance_id, new_table_id, new_tablet_id,
                SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, schema_change.delete_bitmap_lock_initiator(),
                use_version);
        if (!success) {
            return;
        }
    }

    for (size_t i = 0; i < schema_change.txn_ids().size(); ++i) {
        auto tmp_rowset_key =
                meta_rowset_tmp_key({instance_id, schema_change.txn_ids().at(i), new_tablet_id});
        std::string tmp_rowset_val;
        // FIXME: async get
        TxnErrorCode err = txn->get(tmp_rowset_key, &tmp_rowset_val);
        if (err != TxnErrorCode::TXN_OK) {
            SS << "failed to get tmp rowset key"
               << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
               << ", tablet_id=" << new_tablet_id << " tmp_rowset_key=" << hex(tmp_rowset_key)
               << ", err=" << err;
            msg = ss.str();
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::UNDEFINED_ERR
                                                          : cast_as<ErrCategory::READ>(err);
            return;
        }
        auto rowset_key = meta_rowset_key(
                {instance_id, new_tablet_id, schema_change.output_versions().at(i)});
        txn->put(rowset_key, tmp_rowset_val);
        txn->remove(tmp_rowset_key);
    }

    //==========================================================================
    //                      remove schema_change job
    //==========================================================================
    recorded_job.clear_schema_change();
    auto job_val = recorded_job.SerializeAsString();
    txn->put(job_key, job_val);
    if (!new_tablet_job_val.empty()) {
        auto& compactions = *new_recorded_job.mutable_compaction();
        auto origin_size = compactions.size();
        compactions.erase(
                std::remove_if(compactions.begin(), compactions.end(),
                               [&](auto& c) {
                                   return c.has_delete_bitmap_lock_initiator() &&
                                          c.delete_bitmap_lock_initiator() ==
                                                  schema_change.delete_bitmap_lock_initiator();
                               }),
                compactions.end());
        if (compactions.size() < origin_size) {
            INSTANCE_LOG(INFO) << "remove " << (origin_size - compactions.size())
                               << " STOP_TOKEN for schema_change job tablet_id=" << tablet_id
                               << " delete_bitmap_lock_initiator="
                               << schema_change.delete_bitmap_lock_initiator()
                               << " key=" << hex(job_key);
        }
        new_recorded_job.clear_schema_change();
        new_tablet_job_val = new_recorded_job.SerializeAsString();
        txn->put(new_tablet_job_key, new_tablet_job_val);
    }
    INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
                       << " key=" << hex(job_key);

    need_commit = true;
}