in db/compaction/compaction_job.cc [1301:1708]
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
#ifndef ROCKSDB_LITE
if (db_options_.compaction_service) {
CompactionServiceJobStatus comp_status =
ProcessKeyValueCompactionWithCompactionService(sub_compact);
if (comp_status == CompactionServiceJobStatus::kSuccess ||
comp_status == CompactionServiceJobStatus::kFailure) {
return;
}
// fallback to local compaction
assert(comp_status == CompactionServiceJobStatus::kUseLocal);
}
#endif // !ROCKSDB_LITE
uint64_t prev_cpu_micros = db_options_.clock->CPUMicros();
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
// Create compaction filter and fail the compaction if
// IgnoreSnapshots() = false because it is not supported anymore
const CompactionFilter* compaction_filter =
cfd->ioptions()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (compaction_filter == nullptr) {
compaction_filter_from_factory =
sub_compact->compaction->CreateCompactionFilter();
compaction_filter = compaction_filter_from_factory.get();
}
if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
sub_compact->status = Status::NotSupported(
"CompactionFilter::IgnoreSnapshots() = false is not supported "
"anymore.");
return;
}
NotifyOnSubcompactionBegin(sub_compact);
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
existing_snapshots_);
const Slice* const start = sub_compact->start;
const Slice* const end = sub_compact->end;
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
read_options.rate_limiter_priority = Env::IO_LOW;
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
// (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
read_options.total_order_seek = true;
// Note: if we're going to support subcompactions for user-defined timestamps,
// the timestamp part will have to be stripped from the bounds here.
assert((!start && !end) || cfd->user_comparator()->timestamp_size() == 0);
read_options.iterate_lower_bound = start;
read_options.iterate_upper_bound = end;
// Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator.
std::unique_ptr<InternalIterator> raw_input(
versions_->MakeInputIterator(read_options, sub_compact->compaction,
&range_del_agg, file_options_for_read_));
InternalIterator* input = raw_input.get();
IterKey start_ikey;
IterKey end_ikey;
Slice start_slice;
Slice end_slice;
if (start) {
start_ikey.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
start_slice = start_ikey.GetInternalKey();
}
if (end) {
end_ikey.SetInternalKey(*end, kMaxSequenceNumber, kValueTypeForSeek);
end_slice = end_ikey.GetInternalKey();
}
std::unique_ptr<InternalIterator> clip;
if (start || end) {
clip = std::make_unique<ClippingIterator>(
raw_input.get(), start ? &start_slice : nullptr,
end ? &end_slice : nullptr, &cfd->internal_comparator());
input = clip.get();
}
std::unique_ptr<InternalIterator> blob_counter;
if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
sub_compact->blob_garbage_meter = std::make_unique<BlobGarbageMeter>();
blob_counter = std::make_unique<BlobCountingIterator>(
input, sub_compact->blob_garbage_meter.get());
input = blob_counter.get();
}
std::unique_ptr<InternalIterator> trim_history_iter;
if (cfd->user_comparator()->timestamp_size() > 0 && !trim_ts_.empty()) {
trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
input, cfd->user_comparator(), trim_ts_);
input = trim_history_iter.get();
}
input->SeekToFirst();
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
// I/O measurement variables
PerfLevel prev_perf_level = PerfLevel::kEnableTime;
const uint64_t kRecordStatsEvery = 1000;
uint64_t prev_write_nanos = 0;
uint64_t prev_fsync_nanos = 0;
uint64_t prev_range_sync_nanos = 0;
uint64_t prev_prepare_write_nanos = 0;
uint64_t prev_cpu_write_nanos = 0;
uint64_t prev_cpu_read_nanos = 0;
if (measure_io_stats_) {
prev_perf_level = GetPerfLevel();
SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
prev_write_nanos = IOSTATS(write_nanos);
prev_fsync_nanos = IOSTATS(fsync_nanos);
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
MergeHelper merge(
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator.get(),
compaction_filter, db_options_.info_log.get(),
false /* internal key corruption is expected */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
snapshot_checker_, compact_->compaction->level(), db_options_.stats);
const MutableCFOptions* mutable_cf_options =
sub_compact->compaction->mutable_cf_options();
assert(mutable_cf_options);
std::vector<std::string> blob_file_paths;
std::unique_ptr<BlobFileBuilder> blob_file_builder(
mutable_cf_options->enable_blob_files
? new BlobFileBuilder(
versions_, fs_.get(),
sub_compact->compaction->immutable_options(),
mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction,
&blob_file_paths, &sub_compact->blob_file_additions)
: nullptr);
TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:1",
reinterpret_cast<void*>(
const_cast<std::atomic<int>*>(manual_compaction_paused_)));
Status status;
const std::string* const full_history_ts_low =
full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
sub_compact->c_iter.reset(new CompactionIterator(
input, cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
blob_file_builder.get(), db_options_.allow_data_in_errors,
sub_compact->compaction, compaction_filter, shutting_down_,
preserve_deletes_seqnum_, manual_compaction_paused_,
manual_compaction_canceled_, db_options_.info_log, full_history_ts_low));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
sub_compact->FillFilesToCutForTtl();
// ShouldStopBefore() maintains state based on keys processed so far. The
// compaction loop always calls it on the "next" key, thus won't tell it the
// first key. So we do that here.
sub_compact->ShouldStopBefore(c_iter->key(),
sub_compact->current_output_file_size);
}
const auto& c_iter_stats = c_iter->iter_stats();
std::unique_ptr<SstPartitioner> partitioner =
sub_compact->compaction->output_level() == 0
? nullptr
: sub_compact->compaction->CreateSstPartitioner();
std::string last_key_for_partitioner;
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
const Slice& key = c_iter->key();
const Slice& value = c_iter->value();
assert(!end ||
cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);
if (c_iter_stats.num_input_records % kRecordStatsEvery ==
kRecordStatsEvery - 1) {
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
c_iter->ResetRecordCounts();
RecordCompactionIOStats();
}
// Open output file if necessary
if (sub_compact->builder == nullptr) {
status = OpenCompactionOutputFile(sub_compact);
if (!status.ok()) {
break;
}
}
status = sub_compact->AddToBuilder(key, value);
if (!status.ok()) {
break;
}
status = sub_compact->ProcessOutFlowIfNeeded(key, value);
if (!status.ok()) {
break;
}
sub_compact->current_output_file_size =
sub_compact->builder->EstimatedFileSize();
const ParsedInternalKey& ikey = c_iter->ikey();
sub_compact->current_output()->meta.UpdateBoundaries(
key, value, ikey.sequence, ikey.type);
sub_compact->num_output_records++;
// Close output file if it is big enough. Two possibilities determine it's
// time to close it: (1) the current key should be this file's last key, (2)
// the next key should not be in this file.
//
// TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is
// going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
// and 0.6MB instead of 1MB and 0.2MB)
bool output_file_ended = false;
if (sub_compact->compaction->output_level() != 0 &&
sub_compact->current_output_file_size >=
sub_compact->compaction->max_output_file_size()) {
// (1) this key terminates the file. For historical reasons, the iterator
// status before advancing will be given to FinishCompactionOutputFile().
output_file_ended = true;
}
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void*>(
const_cast<std::atomic<int>*>(manual_compaction_paused_)));
if (partitioner.get()) {
last_key_for_partitioner.assign(c_iter->user_key().data_,
c_iter->user_key().size_);
}
c_iter->Next();
if (c_iter->status().IsManualCompactionPaused()) {
break;
}
if (!output_file_ended && c_iter->Valid()) {
if (((partitioner.get() &&
partitioner->ShouldPartition(PartitionerRequest(
last_key_for_partitioner, c_iter->user_key(),
sub_compact->current_output_file_size)) == kRequired) ||
(sub_compact->compaction->output_level() != 0 &&
sub_compact->ShouldStopBefore(
c_iter->key(), sub_compact->current_output_file_size))) &&
sub_compact->builder != nullptr) {
// (2) this key belongs to the next file. For historical reasons, the
// iterator status after advancing will be given to
// FinishCompactionOutputFile().
output_file_ended = true;
}
}
if (output_file_ended) {
const Slice* next_key = nullptr;
if (c_iter->Valid()) {
next_key = &c_iter->key();
}
CompactionIterationStats range_del_out_stats;
status = FinishCompactionOutputFile(input->status(), sub_compact,
&range_del_agg, &range_del_out_stats,
next_key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
}
}
sub_compact->compaction_job_stats.num_blobs_read =
c_iter_stats.num_blobs_read;
sub_compact->compaction_job_stats.total_blob_bytes_read =
c_iter_stats.total_blob_bytes_read;
sub_compact->compaction_job_stats.num_input_deletion_records =
c_iter_stats.num_input_deletion_records;
sub_compact->compaction_job_stats.num_corrupt_keys =
c_iter_stats.num_input_corrupt_records;
sub_compact->compaction_job_stats.num_single_del_fallthru =
c_iter_stats.num_single_del_fallthru;
sub_compact->compaction_job_stats.num_single_del_mismatch =
c_iter_stats.num_single_del_mismatch;
sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
c_iter_stats.total_input_raw_key_bytes;
sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
c_iter_stats.total_input_raw_value_bytes;
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
c_iter_stats.total_filter_time);
if (c_iter_stats.num_blobs_relocated > 0) {
RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
c_iter_stats.num_blobs_relocated);
}
if (c_iter_stats.total_blob_bytes_relocated > 0) {
RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED,
c_iter_stats.total_blob_bytes_relocated);
}
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
RecordCompactionIOStats();
if (status.ok() && cfd->IsDropped()) {
status =
Status::ColumnFamilyDropped("Column family dropped during compaction");
}
if ((status.ok() || status.IsColumnFamilyDropped()) &&
shutting_down_->load(std::memory_order_relaxed)) {
status = Status::ShutdownInProgress("Database shutdown");
}
if ((status.ok() || status.IsColumnFamilyDropped()) &&
((manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed) > 0) ||
(manual_compaction_canceled_ &&
manual_compaction_canceled_->load(std::memory_order_relaxed)))) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (status.ok()) {
status = input->status();
}
if (status.ok()) {
status = c_iter->status();
}
if (status.ok() && sub_compact->builder == nullptr &&
sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
// handle subcompaction containing only range deletions
status = OpenCompactionOutputFile(sub_compact);
}
// Call FinishCompactionOutputFile() even if status is not ok: it needs to
// close the output file.
if (sub_compact->builder != nullptr) {
CompactionIterationStats range_del_out_stats;
Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
&range_del_out_stats);
if (!s.ok() && status.ok()) {
status = s;
}
RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
}
if (blob_file_builder) {
if (status.ok()) {
status = blob_file_builder->Finish();
} else {
blob_file_builder->Abandon(status);
}
blob_file_builder.reset();
}
sub_compact->compaction_job_stats.cpu_micros =
db_options_.clock->CPUMicros() - prev_cpu_micros;
if (measure_io_stats_) {
sub_compact->compaction_job_stats.file_write_nanos +=
IOSTATS(write_nanos) - prev_write_nanos;
sub_compact->compaction_job_stats.file_fsync_nanos +=
IOSTATS(fsync_nanos) - prev_fsync_nanos;
sub_compact->compaction_job_stats.file_range_sync_nanos +=
IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
sub_compact->compaction_job_stats.file_prepare_write_nanos +=
IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
sub_compact->compaction_job_stats.cpu_micros -=
(IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
1000;
if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
SetPerfLevel(prev_perf_level);
}
}
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (!status.ok()) {
if (sub_compact->c_iter) {
sub_compact->c_iter->status().PermitUncheckedError();
}
if (input) {
input->status().PermitUncheckedError();
}
}
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
sub_compact->c_iter.reset();
blob_counter.reset();
clip.reset();
raw_input.reset();
sub_compact->status = status;
NotifyOnSubcompactionCompleted(sub_compact);
}