in cloud/src/meta-service/meta_service_job.cpp [1098:1483]
void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
std::unique_ptr<Transaction>& txn,
const FinishTabletJobRequest* request,
FinishTabletJobResponse* response, TabletJobInfoPB& recorded_job,
std::string& instance_id, std::string& job_key, bool& need_commit) {
//==========================================================================
// check
//==========================================================================
int64_t tablet_id = request->job().idx().tablet_id();
auto& schema_change = request->job().schema_change();
int64_t new_tablet_id = schema_change.new_tablet_idx().tablet_id();
if (new_tablet_id <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid new_tablet_id given";
return;
}
if (new_tablet_id == tablet_id) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "not allow new_tablet_id same with base_tablet_id";
return;
}
auto& new_tablet_idx = const_cast<TabletIndexPB&>(schema_change.new_tablet_idx());
if (!new_tablet_idx.has_table_id() || !new_tablet_idx.has_index_id() ||
!new_tablet_idx.has_partition_id()) {
get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id, new_tablet_idx);
if (code != MetaServiceCode::OK) return;
}
int64_t new_table_id = new_tablet_idx.table_id();
int64_t new_index_id = new_tablet_idx.index_id();
int64_t new_partition_id = new_tablet_idx.partition_id();
auto new_tablet_key = meta_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
std::string new_tablet_val;
doris::TabletMetaCloudPB new_tablet_meta;
TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
if (err != TxnErrorCode::TXN_OK) {
SS << "failed to get new tablet meta"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
<< " key=" << hex(new_tablet_key) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return;
}
if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed tablet meta";
return;
}
if (new_tablet_meta.tablet_state() == doris::TabletStatePB::PB_RUNNING) {
code = MetaServiceCode::JOB_ALREADY_SUCCESS;
msg = "schema_change job already success";
return;
}
if (!new_tablet_meta.has_tablet_state() ||
new_tablet_meta.tablet_state() != doris::TabletStatePB::PB_NOTREADY) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "invalid new tablet state";
return;
}
if (!recorded_job.has_schema_change()) {
SS << "there is no running schema_change, tablet_id=" << tablet_id;
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
auto& recorded_schema_change = recorded_job.schema_change();
using namespace std::chrono;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
if (recorded_schema_change.expiration() > 0 && recorded_schema_change.expiration() < now) {
code = MetaServiceCode::JOB_EXPIRED;
SS << "expired schema_change job, tablet_id=" << tablet_id
<< " job=" << proto_to_json(recorded_schema_change);
msg = ss.str();
// FIXME: Just remove or notify to abort?
// LOG(INFO) << "remove expired job, tablet_id=" << tablet_id << " key=" << hex(job_key);
return;
}
// MUST check initiator to let the retried BE commit this schema_change job.
if (schema_change.id() != recorded_schema_change.id() ||
schema_change.initiator() != recorded_schema_change.initiator()) {
SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id()
<< " given_id=" << schema_change.id()
<< " recorded_job=" << proto_to_json(recorded_schema_change)
<< " given_job=" << proto_to_json(schema_change);
code = MetaServiceCode::INVALID_ARGUMENT;
msg = ss.str();
return;
}
if (request->action() != FinishTabletJobRequest::COMMIT &&
request->action() != FinishTabletJobRequest::ABORT) {
SS << "unsupported action, tablet_id=" << tablet_id << " action=" << request->action();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
auto new_tablet_job_key = job_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
std::string new_tablet_job_val;
TabletJobInfoPB new_recorded_job;
err = txn->get(new_tablet_job_key, &new_tablet_job_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
SS << "internal error,"
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
<< " job=" << proto_to_json(request->job()) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
: cast_as<ErrCategory::READ>(err);
return;
} else if (err == TxnErrorCode::TXN_OK) {
if (!new_recorded_job.ParseFromString(new_tablet_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed new tablet recorded job";
return;
}
}
//==========================================================================
// Abort
//==========================================================================
if (request->action() == FinishTabletJobRequest::ABORT) {
if (schema_change.new_tablet_idx().index_id() ==
recorded_schema_change.new_tablet_idx().index_id() &&
schema_change.new_tablet_idx().tablet_id() ==
recorded_schema_change.new_tablet_idx().tablet_id()) {
// remove schema change
recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
if (!new_tablet_job_val.empty()) {
auto& compactions = *new_recorded_job.mutable_compaction();
auto origin_size = compactions.size();
compactions.erase(
std::remove_if(
compactions.begin(), compactions.end(),
[&](auto& c) {
return c.has_delete_bitmap_lock_initiator() &&
c.delete_bitmap_lock_initiator() ==
schema_change.delete_bitmap_lock_initiator();
}),
compactions.end());
if (compactions.size() < origin_size) {
INSTANCE_LOG(INFO)
<< "remove " << (origin_size - compactions.size())
<< " STOP_TOKEN for schema_change job tablet_id=" << tablet_id
<< " delete_bitmap_lock_initiator="
<< schema_change.delete_bitmap_lock_initiator()
<< " key=" << hex(job_key);
}
new_recorded_job.clear_schema_change();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
}
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
}
return;
}
//==========================================================================
// Commit
//==========================================================================
//
// 1. update new_tablet meta
// 2. move rowsets [2-alter_version] in new_tablet to recycle
// 3. update new_tablet stats
// 4. change tmp rowset to formal rowset
// 5. remove schema_change job
//
//==========================================================================
// update tablet meta
//==========================================================================
new_tablet_meta.set_tablet_state(doris::TabletStatePB::PB_RUNNING);
new_tablet_meta.set_cumulative_layer_point(schema_change.output_cumulative_point());
new_tablet_meta.SerializeToString(&new_tablet_val);
txn->put(new_tablet_key, new_tablet_val);
//==========================================================================
// move rowsets [2-alter_version] to recycle
//==========================================================================
if (!schema_change.has_alter_version()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "invalid alter_version";
return;
}
if (schema_change.alter_version() < 2) {
// no need to update stats
if (!new_tablet_job_val.empty()) {
new_recorded_job.clear_schema_change();
auto& compactions = *new_recorded_job.mutable_compaction();
auto origin_size = compactions.size();
compactions.erase(
std::remove_if(compactions.begin(), compactions.end(),
[&](auto& c) {
return c.has_delete_bitmap_lock_initiator() &&
c.delete_bitmap_lock_initiator() ==
schema_change.delete_bitmap_lock_initiator();
}),
compactions.end());
if (compactions.size() < origin_size) {
INSTANCE_LOG(INFO)
<< "remove " << (origin_size - compactions.size())
<< " STOP_TOKEN for schema_change job tablet_id=" << tablet_id
<< " delete_bitmap_lock_initiator="
<< schema_change.delete_bitmap_lock_initiator() << " key=" << hex(job_key);
}
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
}
need_commit = true;
return;
}
int64_t num_remove_rows = 0;
int64_t size_remove_rowsets = 0;
int64_t num_remove_rowsets = 0;
int64_t num_remove_segments = 0;
int64_t index_size_remove_rowsets = 0;
int64_t segment_size_remove_rowsets = 0;
auto rs_start = meta_rowset_key({instance_id, new_tablet_id, 2});
auto rs_end = meta_rowset_key({instance_id, new_tablet_id, schema_change.alter_version() + 1});
std::unique_ptr<RangeGetIterator> it;
auto rs_start1 = rs_start;
do {
TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
if (err != TxnErrorCode::TXN_OK) {
code = MetaServiceCode::KV_TXN_GET_ERR;
SS << "internal error, failed to get rowset range, err=" << err
<< " tablet_id=" << new_tablet_id << " range=[" << hex(rs_start1) << ", << "
<< hex(rs_end) << ")";
msg = ss.str();
return;
}
while (it->has_next()) {
auto [k, v] = it->next();
doris::RowsetMetaCloudPB rs;
if (!rs.ParseFromArray(v.data(), v.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
SS << "malformed rowset meta, unable to deserialize, tablet_id=" << new_tablet_id
<< " key=" << hex(k);
msg = ss.str();
return;
}
num_remove_rows += rs.num_rows();
size_remove_rowsets += rs.total_disk_size();
++num_remove_rowsets;
num_remove_segments += rs.num_segments();
index_size_remove_rowsets += rs.index_disk_size();
segment_size_remove_rowsets += rs.data_disk_size();
auto recycle_key = recycle_rowset_key({instance_id, new_tablet_id, rs.rowset_id_v2()});
RecycleRowsetPB recycle_rowset;
recycle_rowset.set_creation_time(now);
recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
recycle_rowset.set_type(RecycleRowsetPB::DROP);
auto recycle_val = recycle_rowset.SerializeAsString();
txn->put(recycle_key, recycle_val);
INSTANCE_LOG(INFO) << "put recycle rowset, tablet_id=" << new_tablet_id
<< " key=" << hex(recycle_key);
if (!it->has_next()) rs_start1 = k;
}
rs_start1.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
txn->remove(rs_start, rs_end);
//==========================================================================
// update new_tablet stats
//==========================================================================
auto stats = response->mutable_stats();
TabletStats detached_stats;
// ATTN: The condition that snapshot read can be used to get tablet stats is: all other transactions that put tablet stats
// can make read write conflicts with this transaction on other keys. Currently, if all meta-service nodes are running
// with `config::split_tablet_stats = true` can meet the condition.
internal_get_tablet_stats(code, msg, txn.get(), instance_id, new_tablet_idx, *stats,
detached_stats, config::snapshot_get_tablet_stats);
// clang-format off
stats->set_cumulative_point(schema_change.output_cumulative_point());
stats->set_num_rows(stats->num_rows() + (schema_change.num_output_rows() - num_remove_rows));
stats->set_data_size(stats->data_size() + (schema_change.size_output_rowsets() - size_remove_rowsets));
stats->set_num_rowsets(stats->num_rowsets() + (schema_change.num_output_rowsets() - num_remove_rowsets));
stats->set_num_segments(stats->num_segments() + (schema_change.num_output_segments() - num_remove_segments));
stats->set_index_size(stats->index_size() + (schema_change.index_size_output_rowsets() - index_size_remove_rowsets));
stats->set_segment_size(stats->segment_size() + (schema_change.segment_size_output_rowsets() - segment_size_remove_rowsets));
// clang-format on
auto stats_key = stats_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
auto stats_val = stats->SerializeAsString();
txn->put(stats_key, stats_val);
merge_tablet_stats(*stats, detached_stats);
VLOG_DEBUG << "update tablet stats tablet_id=" << tablet_id << " key=" << hex(stats_key)
<< " stats=" << proto_to_json(*stats);
//==========================================================================
// change tmp rowset to formal rowset
//==========================================================================
if (schema_change.txn_ids().empty() || schema_change.output_versions().empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty txn_ids or output_versions";
return;
}
// process mow table, check lock
if (new_tablet_meta.enable_unique_key_merge_on_write()) {
std::string use_version = config::use_delete_bitmap_lock_version;
if (config::use_delete_bitmap_lock_random_version && !use_new_version_random()) {
use_version = "v1";
}
bool success = check_and_remove_delete_bitmap_update_lock(
code, msg, ss, txn, instance_id, new_table_id, new_tablet_id,
SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, schema_change.delete_bitmap_lock_initiator(),
use_version);
if (!success) {
return;
}
}
for (size_t i = 0; i < schema_change.txn_ids().size(); ++i) {
auto tmp_rowset_key =
meta_rowset_tmp_key({instance_id, schema_change.txn_ids().at(i), new_tablet_id});
std::string tmp_rowset_val;
// FIXME: async get
TxnErrorCode err = txn->get(tmp_rowset_key, &tmp_rowset_val);
if (err != TxnErrorCode::TXN_OK) {
SS << "failed to get tmp rowset key"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< ", tablet_id=" << new_tablet_id << " tmp_rowset_key=" << hex(tmp_rowset_key)
<< ", err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::UNDEFINED_ERR
: cast_as<ErrCategory::READ>(err);
return;
}
auto rowset_key = meta_rowset_key(
{instance_id, new_tablet_id, schema_change.output_versions().at(i)});
txn->put(rowset_key, tmp_rowset_val);
txn->remove(tmp_rowset_key);
}
//==========================================================================
// remove schema_change job
//==========================================================================
recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
if (!new_tablet_job_val.empty()) {
auto& compactions = *new_recorded_job.mutable_compaction();
auto origin_size = compactions.size();
compactions.erase(
std::remove_if(compactions.begin(), compactions.end(),
[&](auto& c) {
return c.has_delete_bitmap_lock_initiator() &&
c.delete_bitmap_lock_initiator() ==
schema_change.delete_bitmap_lock_initiator();
}),
compactions.end());
if (compactions.size() < origin_size) {
INSTANCE_LOG(INFO) << "remove " << (origin_size - compactions.size())
<< " STOP_TOKEN for schema_change job tablet_id=" << tablet_id
<< " delete_bitmap_lock_initiator="
<< schema_change.delete_bitmap_lock_initiator()
<< " key=" << hex(job_key);
}
new_recorded_job.clear_schema_change();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
}
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
}