in src/kudu/fs/log_block_manager.cc [2159:2425]
void LogBlockManager::OpenDataDir(DataDir* dir,
FsReport* report,
Status* result_status) {
FsReport local_report;
local_report.data_dirs.push_back(dir->dir());
// We are going to perform these checks.
//
// Note: this isn't necessarily the complete set of FsReport checks; there
// may be checks that the LBM cannot perform.
local_report.full_container_space_check.emplace();
local_report.incomplete_container_check.emplace();
local_report.malformed_record_check.emplace();
local_report.misaligned_block_check.emplace();
local_report.partial_record_check.emplace();
// Keep track of deleted blocks whose space hasn't been punched; they will
// be repunched during repair.
vector<scoped_refptr<internal::LogBlock>> need_repunching;
// Keep track of containers that have nothing but dead blocks; they will be
// deleted during repair.
vector<string> dead_containers;
// Keep track of containers whose live block ratio is low; their metadata
// files will be compacted during repair.
unordered_map<string, vector<BlockRecordPB>> low_live_block_containers;
// Find all containers and open them.
unordered_set<string> containers_seen;
vector<string> children;
Status s = env_->GetChildren(dir->dir(), &children);
if (!s.ok()) {
HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
ErrorHandlerType::DISK_ERROR, dir));
*result_status = s.CloneAndPrepend(Substitute(
"Could not list children of $0", dir->dir()));
return;
}
MonoTime last_opened_container_log_time = MonoTime::Now();
for (const string& child : children) {
string container_name;
if (!TryStripSuffixString(
child, LogBlockManager::kContainerDataFileSuffix, &container_name) &&
!TryStripSuffixString(
child, LogBlockManager::kContainerMetadataFileSuffix, &container_name)) {
continue;
}
if (!InsertIfNotPresent(&containers_seen, container_name)) {
continue;
}
unique_ptr<LogBlockContainer> container;
s = LogBlockContainer::Open(
this, dir, &local_report, container_name, &container);
if (s.IsAborted()) {
// Skip the container. Open() added a record of it to 'local_report' for us.
continue;
}
if (!s.ok()) {
*result_status = s.CloneAndPrepend(Substitute(
"Could not open container $0", container_name));
return;
}
// Process the records, building a container-local map for live blocks and
// a list of dead blocks.
//
// It's important that we don't try to add these blocks to the global map
// incrementally as we see each record, since it's possible that one container
// has a "CREATE <b>" while another has a "CREATE <b> ; DELETE <b>" pair.
// If we processed those two containers in this order, then upon processing
// the second container, we'd think there was a duplicate block. Building
// the container-local map first ensures that we discount deleted blocks
// before checking for duplicate IDs.
//
// NOTE: Since KUDU-1538, we allocate sequential block IDs, which makes reuse
// exceedingly unlikely. However, we might have old data which still exhibits
// the above issue.
UntrackedBlockMap live_blocks;
BlockRecordMap live_block_records;
vector<scoped_refptr<internal::LogBlock>> dead_blocks;
uint64_t max_block_id = 0;
s = container->ProcessRecords(&local_report,
&live_blocks,
&live_block_records,
&dead_blocks,
&max_block_id);
if (!s.ok()) {
*result_status = s.CloneAndPrepend(Substitute(
"Could not process records in container $0", container->ToString()));
return;
}
// With deleted blocks out of the way, check for misaligned blocks.
//
// We could also enforce that the record's offset is aligned with the
// underlying filesystem's block size, an invariant maintained by the log
// block manager. However, due to KUDU-1793, that invariant may have been
// broken, so we'll note but otherwise allow it.
for (const auto& e : live_blocks) {
if (PREDICT_FALSE(e.second->offset() %
container->instance()->filesystem_block_size_bytes() != 0)) {
local_report.misaligned_block_check->entries.emplace_back(
container->ToString(), e.first);
}
}
if (container->full()) {
// Full containers without any live blocks can be deleted outright.
//
// TODO(adar): this should be reported as an inconsistency once dead
// container deletion is also done in real time. Until then, it would be
// confusing to report it as such since it'll be a natural event at startup.
if (container->live_blocks() == 0) {
DCHECK(live_blocks.empty());
dead_containers.emplace_back(container->ToString());
} else if (static_cast<double>(container->live_blocks()) /
container->total_blocks() <= FLAGS_log_container_live_metadata_before_compact_ratio) {
// Metadata files of containers with very few live blocks will be compacted.
//
// TODO(adar): this should be reported as an inconsistency once
// container metadata compaction is also done in realtime. Until then,
// it would be confusing to report it as such since it'll be a natural
// event at startup.
vector<BlockRecordPB> records(live_block_records.size());
int i = 0;
for (auto& e : live_block_records) {
records[i].Swap(&e.second);
i++;
}
// Sort the records such that their ordering reflects the ordering in
// the pre-compacted metadata file.
//
// This is preferred to storing the records in an order-preserving
// container (such as std::map) because while records are temporarily
// retained for every container, only some containers will actually
// undergo metadata compaction.
std::sort(records.begin(), records.end(),
[](const BlockRecordPB& a, const BlockRecordPB& b) {
// Sort by timestamp.
if (a.timestamp_us() != b.timestamp_us()) {
return a.timestamp_us() < b.timestamp_us();
}
// If the timestamps match, sort by offset.
//
// If the offsets also match (i.e. both blocks are of zero length),
// it doesn't matter which of the two records comes first.
return a.offset() < b.offset();
});
low_live_block_containers[container->ToString()] = std::move(records);
}
// Having processed the block records, let's check whether any full
// containers have any extra space (left behind after a crash or from an
// older version of Kudu).
//
// Filesystems are unpredictable beasts and may misreport the amount of
// space allocated to a file in various interesting ways. Some examples:
// - XFS's speculative preallocation feature may artificially enlarge the
// container's data file without updating its file size. This makes the
// file size untrustworthy for the purposes of measuring allocated space.
// See KUDU-1856 for more details.
// - On el6.6/ext4 a container data file that consumed ~32K according to
// its extent tree was actually reported as consuming an additional fs
// block (2k) of disk space. A similar container data file (generated
// via the same workload) on Ubuntu 16.04/ext4 did not exhibit this.
// The suspicion is that older versions of ext4 include interior nodes
// of the extent tree when reporting file block usage.
//
// To deal with these issues, our extra space cleanup code (deleted block
// repunching and container truncation) is gated on an "actual disk space
// consumed" heuristic. To prevent unnecessary triggering of the
// heuristic, we allow for some slop in our size measurements. The exact
// amount of slop is configurable via
// log_container_excess_space_before_cleanup_fraction.
//
// Too little slop and we'll do unnecessary work at startup. Too much and
// more unused space may go unreclaimed.
string data_filename = StrCat(container->ToString(), kContainerDataFileSuffix);
uint64_t reported_size;
s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
if (!s.ok()) {
HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
ErrorHandlerType::DISK_ERROR, dir));
*result_status = s.CloneAndPrepend(Substitute(
"Could not get on-disk file size of container $0", container->ToString()));
return;
}
int64_t cleanup_threshold_size = container->live_bytes_aligned() *
(1 + FLAGS_log_container_excess_space_before_cleanup_fraction);
if (reported_size > cleanup_threshold_size) {
local_report.full_container_space_check->entries.emplace_back(
container->ToString(), reported_size - container->live_bytes_aligned());
// If the container is to be deleted outright, don't bother repunching
// its blocks. The report entry remains, however, so it's clear that
// there was a space discrepancy.
if (container->live_blocks()) {
need_repunching.insert(need_repunching.end(),
dead_blocks.begin(), dead_blocks.end());
}
}
local_report.stats.lbm_full_container_count++;
}
local_report.stats.live_block_bytes += container->live_bytes();
local_report.stats.live_block_bytes_aligned += container->live_bytes_aligned();
local_report.stats.live_block_count += container->live_blocks();
local_report.stats.lbm_container_count++;
// Log number of containers opened every 10 seconds
MonoTime now = MonoTime::Now();
if ((now - last_opened_container_log_time).ToSeconds() > 10) {
LOG(INFO) << Substitute("Opened $0 log block containers in $1",
local_report.stats.lbm_container_count, dir->dir());
last_opened_container_log_time = now;
}
next_block_id_.StoreMax(max_block_id + 1);
// Under the lock, merge this map into the main block map and add
// the container.
{
std::lock_guard<simple_spinlock> l(lock_);
// To avoid cacheline contention during startup, we aggregate all of the
// memory in a local and add it to the mem-tracker in a single increment
// at the end of this loop.
int64_t mem_usage = 0;
for (UntrackedBlockMap::value_type& e : live_blocks) {
int block_mem = kudu_malloc_usable_size(e.second.get());
if (!AddLogBlockUnlocked(std::move(e.second))) {
// TODO(adar): track as an inconsistency?
LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
<< " which already is alive from another container when "
<< " processing container " << container->ToString();
}
mem_usage += block_mem;
}
mem_tracker_->Consume(mem_usage);
AddNewContainerUnlocked(container.get());
MakeContainerAvailableUnlocked(container.release());
}
}
// Like the rest of Open(), repairs are performed per data directory to take
// advantage of parallelism.
s = Repair(dir,
&local_report,
std::move(need_repunching),
std::move(dead_containers),
std::move(low_live_block_containers));
if (!s.ok()) {
*result_status = s.CloneAndPrepend(Substitute(
"fatal error while repairing inconsistencies in data directory $0",
dir->dir()));
return;
}
*report = std::move(local_report);
*result_status = Status::OK();
}