in cloud/src/meta-service/meta_service_job.cpp [683:1096]
void process_compaction_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
std::unique_ptr<Transaction>& txn,
const FinishTabletJobRequest* request,
FinishTabletJobResponse* response, TabletJobInfoPB& recorded_job,
std::string& instance_id, std::string& job_key, bool& need_commit) {
//==========================================================================
// check
//==========================================================================
int64_t table_id = request->job().idx().table_id();
int64_t index_id = request->job().idx().index_id();
int64_t partition_id = request->job().idx().partition_id();
int64_t tablet_id = request->job().idx().tablet_id();
if (recorded_job.compaction().empty()) {
SS << "there is no running compaction, tablet_id=" << tablet_id;
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
auto& compaction = request->job().compaction(0);
auto recorded_compaction = recorded_job.mutable_compaction()->begin();
for (; recorded_compaction != recorded_job.mutable_compaction()->end(); ++recorded_compaction) {
if (recorded_compaction->id() == compaction.id()) break;
}
if (recorded_compaction == recorded_job.mutable_compaction()->end()) {
SS << "unmatched job id, recorded_job=" << proto_to_json(recorded_job)
<< " given_job=" << proto_to_json(compaction);
code = MetaServiceCode::INVALID_ARGUMENT;
msg = ss.str();
return;
}
using namespace std::chrono;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
if (recorded_compaction->expiration() > 0 && recorded_compaction->expiration() < now) {
code = MetaServiceCode::JOB_EXPIRED;
SS << "expired compaction job, tablet_id=" << tablet_id
<< " job=" << proto_to_json(*recorded_compaction);
msg = ss.str();
// FIXME: Just remove or notify to abort?
// LOG(INFO) << "remove expired job, tablet_id=" << tablet_id << " key=" << hex(job_key);
return;
}
if (request->action() != FinishTabletJobRequest::COMMIT &&
request->action() != FinishTabletJobRequest::ABORT &&
request->action() != FinishTabletJobRequest::LEASE) {
SS << "unsupported action, tablet_id=" << tablet_id << " action=" << request->action();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
bool abort_compaction = false;
if (request->action() == FinishTabletJobRequest::COMMIT &&
!check_compaction_input_verions(compaction, recorded_job, ss)) {
msg = ss.str();
INSTANCE_LOG(INFO) << msg;
abort_compaction = true;
response->set_alter_version(recorded_job.schema_change().alter_version());
code = MetaServiceCode::JOB_CHECK_ALTER_VERSION;
}
//==========================================================================
// Abort
//==========================================================================
if (request->action() == FinishTabletJobRequest::ABORT || abort_compaction) {
// TODO(gavin): mv tmp rowsets to recycle or remove them directly
recorded_job.mutable_compaction()->erase(recorded_compaction);
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "abort tablet compaction job, tablet_id=" << tablet_id
<< " key=" << hex(job_key);
if (compaction.has_delete_bitmap_lock_initiator()) {
std::string use_version = config::use_delete_bitmap_lock_version;
if (config::use_delete_bitmap_lock_random_version && !use_new_version_random()) {
use_version = "v1";
}
remove_delete_bitmap_update_lock(
txn, instance_id, table_id, tablet_id, COMPACTION_DELETE_BITMAP_LOCK_ID,
compaction.delete_bitmap_lock_initiator(), use_version);
}
need_commit = true;
return;
}
//==========================================================================
// Lease
//==========================================================================
if (request->action() == FinishTabletJobRequest::LEASE) {
if (compaction.lease() <= 0 || recorded_compaction->lease() > compaction.lease()) {
ss << "invalid lease. recoreded_lease=" << recorded_compaction->lease()
<< " req_lease=" << compaction.lease();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
recorded_compaction->set_lease(compaction.lease());
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "lease tablet compaction job, tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
return;
}
//==========================================================================
// Commit
//==========================================================================
//
// 1. update tablet stats
// 2. move compaction input rowsets to recycle
// 3. change tmp rowset to formal rowset
// 4. remove compaction job
//
//==========================================================================
// Update tablet stats
//==========================================================================
auto stats = response->mutable_stats();
TabletStats detached_stats;
// ATTN: The condition that snapshot read can be used to get tablet stats is: all other transactions that put tablet stats
// can make read write conflicts with this transaction on other keys. Currently, if all meta-service nodes are running
// with `config::split_tablet_stats = true` can meet the condition.
internal_get_tablet_stats(code, msg, txn.get(), instance_id, request->job().idx(), *stats,
detached_stats, config::snapshot_get_tablet_stats);
if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
stats->set_cumulative_point(compaction.output_cumulative_point());
stats->set_last_cumu_compaction_time_ms(now * 1000);
} else if (compaction.type() == TabletCompactionJobPB::CUMULATIVE) {
// clang-format off
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
if (compaction.output_cumulative_point() > stats->cumulative_point()) {
// After supporting parallel cumu compaction, compaction with older cumu point may be committed after
// new cumu point has been set, MUST NOT set cumu point back to old value
stats->set_cumulative_point(compaction.output_cumulative_point());
}
stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() - compaction.num_input_rows()));
stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
stats->set_segment_size(stats->segment_size() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
stats->set_last_cumu_compaction_time_ms(now * 1000);
// clang-format on
} else if (compaction.type() == TabletCompactionJobPB::BASE) {
// clang-format off
stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() - compaction.num_input_rows()));
stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
stats->set_segment_size(stats->segment_size() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
stats->set_last_base_compaction_time_ms(now * 1000);
// clang-format on
} else if (compaction.type() == TabletCompactionJobPB::FULL) {
// clang-format off
stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
if (compaction.output_cumulative_point() > stats->cumulative_point()) {
// After supporting parallel cumu compaction, compaction with older cumu point may be committed after
// new cumu point has been set, MUST NOT set cumu point back to old value
stats->set_cumulative_point(compaction.output_cumulative_point());
}
stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() - compaction.num_input_rows()));
stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
stats->set_segment_size(stats->segment_size() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
stats->set_last_full_compaction_time_ms(now * 1000);
// clang-format on
} else {
msg = "invalid compaction type";
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
auto stats_key = stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
auto stats_val = stats->SerializeAsString();
VLOG_DEBUG << "data size, tablet_id=" << tablet_id << " stats.num_rows=" << stats->num_rows()
<< " stats.data_size=" << stats->data_size()
<< " stats.num_rowsets=" << stats->num_rowsets()
<< " stats.num_segments=" << stats->num_segments()
<< " stats.index_size=" << stats->index_size()
<< " stats.segment_size=" << stats->segment_size()
<< " detached_stats.num_rows=" << detached_stats.num_rows
<< " detached_stats.data_size=" << detached_stats.data_size
<< " detached_stats.num_rowset=" << detached_stats.num_rowsets
<< " detached_stats.num_segments=" << detached_stats.num_segs
<< " detached_stats.index_size=" << detached_stats.index_size
<< " detached_stats.segment_size=" << detached_stats.segment_size
<< " compaction.size_output_rowsets=" << compaction.size_output_rowsets()
<< " compaction.size_input_rowsets=" << compaction.size_input_rowsets();
txn->put(stats_key, stats_val);
merge_tablet_stats(*stats, detached_stats); // this is to check
if (stats->data_size() < 0 || stats->num_rowsets() < 1) [[unlikely]] {
INSTANCE_LOG(ERROR) << "buggy data size, tablet_id=" << tablet_id
<< " stats.num_rows=" << stats->num_rows()
<< " stats.data_size=" << stats->data_size()
<< " stats.num_rowsets=" << stats->num_rowsets()
<< " stats.num_segments=" << stats->num_segments()
<< " stats.index_size=" << stats->index_size()
<< " stats.segment_size=" << stats->segment_size()
<< " detached_stats.num_rows=" << detached_stats.num_rows
<< " detached_stats.data_size=" << detached_stats.data_size
<< " detached_stats.num_rowset=" << detached_stats.num_rowsets
<< " detached_stats.num_segments=" << detached_stats.num_segs
<< " detached_stats.index_size=" << detached_stats.index_size
<< " detached_stats.segment_size=" << detached_stats.segment_size
<< " compaction.size_output_rowsets="
<< compaction.size_output_rowsets()
<< " compaction.size_input_rowsets=" << compaction.size_input_rowsets();
DCHECK(false) << "buggy data size, tablet_id=" << tablet_id;
}
VLOG_DEBUG << "update tablet stats tablet_id=" << tablet_id << " key=" << hex(stats_key)
<< " stats=" << proto_to_json(*stats);
if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
recorded_job.mutable_compaction()->erase(recorded_compaction);
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "remove compaction job, tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
return;
}
// remove delete bitmap update lock for MoW table
if (compaction.has_delete_bitmap_lock_initiator()) {
std::string use_version = config::use_delete_bitmap_lock_version;
if (config::use_delete_bitmap_lock_random_version && !use_new_version_random()) {
use_version = "v1";
}
bool success = check_and_remove_delete_bitmap_update_lock(
code, msg, ss, txn, instance_id, table_id, tablet_id,
COMPACTION_DELETE_BITMAP_LOCK_ID, compaction.delete_bitmap_lock_initiator(),
use_version);
if (!success) {
return;
}
}
//==========================================================================
// Move input rowsets to recycle
//==========================================================================
if (compaction.input_versions_size() != 2 || compaction.output_versions_size() != 1 ||
compaction.output_rowset_ids_size() != 1) {
code = MetaServiceCode::INVALID_ARGUMENT;
SS << "invalid input or output versions, input_versions_size="
<< compaction.input_versions_size()
<< " output_versions_size=" << compaction.output_versions_size()
<< " output_rowset_ids_size=" << compaction.output_rowset_ids_size();
msg = ss.str();
return;
}
auto start = compaction.input_versions(0);
auto end = compaction.input_versions(1);
auto rs_start = meta_rowset_key({instance_id, tablet_id, start});
auto rs_end = meta_rowset_key({instance_id, tablet_id, end + 1});
std::unique_ptr<RangeGetIterator> it;
int num_rowsets = 0;
std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
(int*)0x01, [&rs_start, &rs_end, &num_rowsets, &instance_id](int*) {
INSTANCE_LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets << " range=["
<< hex(rs_start) << "," << hex(rs_end) << "]";
});
auto rs_start1 = rs_start;
do {
TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
SS << "internal error, failed to get rowset range, err=" << err
<< " tablet_id=" << tablet_id << " range=[" << hex(rs_start1) << ", << "
<< hex(rs_end) << ")";
msg = ss.str();
return;
}
while (it->has_next()) {
auto [k, v] = it->next();
doris::RowsetMetaCloudPB rs;
if (!rs.ParseFromArray(v.data(), v.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
SS << "malformed rowset meta, unable to deserialize, tablet_id=" << tablet_id
<< " key=" << hex(k);
msg = ss.str();
return;
}
// remove delete bitmap of input rowset for MoW table
if (compaction.has_delete_bitmap_lock_initiator()) {
auto delete_bitmap_start =
meta_delete_bitmap_key({instance_id, tablet_id, rs.rowset_id_v2(), 0, 0});
auto delete_bitmap_end = meta_delete_bitmap_key(
{instance_id, tablet_id, rs.rowset_id_v2(), INT64_MAX, INT64_MAX});
txn->remove(delete_bitmap_start, delete_bitmap_end);
}
auto recycle_key = recycle_rowset_key({instance_id, tablet_id, rs.rowset_id_v2()});
RecycleRowsetPB recycle_rowset;
recycle_rowset.set_creation_time(now);
recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
auto recycle_val = recycle_rowset.SerializeAsString();
txn->put(recycle_key, recycle_val);
INSTANCE_LOG(INFO) << "put recycle rowset, tablet_id=" << tablet_id
<< " key=" << hex(recycle_key);
++num_rowsets;
if (!it->has_next()) rs_start1 = k;
}
rs_start1.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
txn->remove(rs_start, rs_end);
LOG_INFO("cloud process compaction job txn remove meta rowset key")
.tag("instance_id", instance_id)
.tag("tablet_id", tablet_id)
.tag("start_version", start)
.tag("end_version", end + 1)
.tag("rs_start key", hex(rs_start))
.tag("rs_end key", hex(rs_end));
TEST_SYNC_POINT_CALLBACK("process_compaction_job::loop_input_done", &num_rowsets);
if (num_rowsets < 1) {
SS << "too few input rowsets, tablet_id=" << tablet_id << " num_rowsets=" << num_rowsets;
code = MetaServiceCode::UNDEFINED_ERR;
msg = ss.str();
recorded_job.mutable_compaction()->erase(recorded_compaction);
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "remove compaction job, tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
TEST_SYNC_POINT_CALLBACK("process_compaction_job::too_few_rowsets", &need_commit);
return;
}
//==========================================================================
// Change tmp rowset to formal rowset
//==========================================================================
if (compaction.txn_id_size() != 1) {
SS << "invalid txn_id, txn_id_size=" << compaction.txn_id_size();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
int64_t txn_id = compaction.txn_id(0);
auto& rowset_id = compaction.output_rowset_ids(0);
if (txn_id <= 0 || rowset_id.empty()) {
SS << "invalid txn_id or rowset_id, tablet_id=" << tablet_id << " txn_id=" << txn_id
<< " rowset_id=" << rowset_id;
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
auto tmp_rowset_key = meta_rowset_tmp_key({instance_id, txn_id, tablet_id});
std::string tmp_rowset_val;
TxnErrorCode err = txn->get(tmp_rowset_key, &tmp_rowset_val);
if (err != TxnErrorCode::TXN_OK) {
SS << "failed to get tmp rowset key"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< ", tablet_id=" << tablet_id << " tmp_rowset_key=" << hex(tmp_rowset_key)
<< ", err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::UNDEFINED_ERR
: cast_as<ErrCategory::READ>(err);
return;
}
// We don't actually need to parse the rowset meta
doris::RowsetMetaCloudPB rs_meta;
rs_meta.ParseFromString(tmp_rowset_val);
if (rs_meta.txn_id() <= 0) {
SS << "invalid txn_id in output tmp rowset meta, tablet_id=" << tablet_id
<< " txn_id=" << rs_meta.txn_id();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
txn->remove(tmp_rowset_key);
INSTANCE_LOG(INFO) << "remove tmp rowset meta, tablet_id=" << tablet_id
<< " tmp_rowset_key=" << hex(tmp_rowset_key);
int64_t version = compaction.output_versions(0);
auto rowset_key = meta_rowset_key({instance_id, tablet_id, version});
txn->put(rowset_key, tmp_rowset_val);
INSTANCE_LOG(INFO) << "put rowset meta, tablet_id=" << tablet_id
<< " rowset_key=" << hex(rowset_key);
//==========================================================================
// Remove compaction job
//==========================================================================
// TODO(gavin): move deleted job info into recycle or history
recorded_job.mutable_compaction()->erase(recorded_compaction);
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "remove compaction job tabelt_id=" << tablet_id
<< " key=" << hex(job_key);
response->set_alter_version(recorded_job.has_schema_change() &&
recorded_job.schema_change().has_alter_version()
? recorded_job.schema_change().alter_version()
: -1);
need_commit = true;
}