void LogBlockManager::OpenDataDir()

in src/kudu/fs/log_block_manager.cc [2159:2425]


void LogBlockManager::OpenDataDir(DataDir* dir,
                                  FsReport* report,
                                  Status* result_status) {
  FsReport local_report;
  local_report.data_dirs.push_back(dir->dir());

  // We are going to perform these checks.
  //
  // Note: this isn't necessarily the complete set of FsReport checks; there
  // may be checks that the LBM cannot perform.
  local_report.full_container_space_check.emplace();
  local_report.incomplete_container_check.emplace();
  local_report.malformed_record_check.emplace();
  local_report.misaligned_block_check.emplace();
  local_report.partial_record_check.emplace();

  // Keep track of deleted blocks whose space hasn't been punched; they will
  // be repunched during repair.
  vector<scoped_refptr<internal::LogBlock>> need_repunching;

  // Keep track of containers that have nothing but dead blocks; they will be
  // deleted during repair.
  vector<string> dead_containers;

  // Keep track of containers whose live block ratio is low; their metadata
  // files will be compacted during repair.
  unordered_map<string, vector<BlockRecordPB>> low_live_block_containers;

  // Find all containers and open them.
  unordered_set<string> containers_seen;
  vector<string> children;
  Status s = env_->GetChildren(dir->dir(), &children);
  if (!s.ok()) {
    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
        ErrorHandlerType::DISK_ERROR, dir));
    *result_status = s.CloneAndPrepend(Substitute(
        "Could not list children of $0", dir->dir()));
    return;
  }
  MonoTime last_opened_container_log_time = MonoTime::Now();
  for (const string& child : children) {
    string container_name;
    if (!TryStripSuffixString(
            child, LogBlockManager::kContainerDataFileSuffix, &container_name) &&
        !TryStripSuffixString(
            child, LogBlockManager::kContainerMetadataFileSuffix, &container_name)) {
      continue;
    }
    if (!InsertIfNotPresent(&containers_seen, container_name)) {
      continue;
    }

    unique_ptr<LogBlockContainer> container;
    s = LogBlockContainer::Open(
        this, dir, &local_report, container_name, &container);
    if (s.IsAborted()) {
      // Skip the container. Open() added a record of it to 'local_report' for us.
      continue;
    }
    if (!s.ok()) {
      *result_status = s.CloneAndPrepend(Substitute(
          "Could not open container $0", container_name));
      return;
    }

    // Process the records, building a container-local map for live blocks and
    // a list of dead blocks.
    //
    // It's important that we don't try to add these blocks to the global map
    // incrementally as we see each record, since it's possible that one container
    // has a "CREATE <b>" while another has a "CREATE <b> ; DELETE <b>" pair.
    // If we processed those two containers in this order, then upon processing
    // the second container, we'd think there was a duplicate block. Building
    // the container-local map first ensures that we discount deleted blocks
    // before checking for duplicate IDs.
    //
    // NOTE: Since KUDU-1538, we allocate sequential block IDs, which makes reuse
    // exceedingly unlikely. However, we might have old data which still exhibits
    // the above issue.
    UntrackedBlockMap live_blocks;
    BlockRecordMap live_block_records;
    vector<scoped_refptr<internal::LogBlock>> dead_blocks;
    uint64_t max_block_id = 0;
    s = container->ProcessRecords(&local_report,
                                  &live_blocks,
                                  &live_block_records,
                                  &dead_blocks,
                                  &max_block_id);
    if (!s.ok()) {
      *result_status = s.CloneAndPrepend(Substitute(
          "Could not process records in container $0", container->ToString()));
      return;
    }

    // With deleted blocks out of the way, check for misaligned blocks.
    //
    // We could also enforce that the record's offset is aligned with the
    // underlying filesystem's block size, an invariant maintained by the log
    // block manager. However, due to KUDU-1793, that invariant may have been
    // broken, so we'll note but otherwise allow it.
    for (const auto& e : live_blocks) {
      if (PREDICT_FALSE(e.second->offset() %
                        container->instance()->filesystem_block_size_bytes() != 0)) {
        local_report.misaligned_block_check->entries.emplace_back(
            container->ToString(), e.first);

      }
    }

    if (container->full()) {
      // Full containers without any live blocks can be deleted outright.
      //
      // TODO(adar): this should be reported as an inconsistency once dead
      // container deletion is also done in real time. Until then, it would be
      // confusing to report it as such since it'll be a natural event at startup.
      if (container->live_blocks() == 0) {
        DCHECK(live_blocks.empty());
        dead_containers.emplace_back(container->ToString());
      } else if (static_cast<double>(container->live_blocks()) /
          container->total_blocks() <= FLAGS_log_container_live_metadata_before_compact_ratio) {
        // Metadata files of containers with very few live blocks will be compacted.
        //
        // TODO(adar): this should be reported as an inconsistency once
        // container metadata compaction is also done in realtime. Until then,
        // it would be confusing to report it as such since it'll be a natural
        // event at startup.
        vector<BlockRecordPB> records(live_block_records.size());
        int i = 0;
        for (auto& e : live_block_records) {
          records[i].Swap(&e.second);
          i++;
        }

        // Sort the records such that their ordering reflects the ordering in
        // the pre-compacted metadata file.
        //
        // This is preferred to storing the records in an order-preserving
        // container (such as std::map) because while records are temporarily
        // retained for every container, only some containers will actually
        // undergo metadata compaction.
        std::sort(records.begin(), records.end(),
                  [](const BlockRecordPB& a, const BlockRecordPB& b) {
          // Sort by timestamp.
          if (a.timestamp_us() != b.timestamp_us()) {
            return a.timestamp_us() < b.timestamp_us();
          }

          // If the timestamps match, sort by offset.
          //
          // If the offsets also match (i.e. both blocks are of zero length),
          // it doesn't matter which of the two records comes first.
          return a.offset() < b.offset();
        });

        low_live_block_containers[container->ToString()] = std::move(records);
      }

      // Having processed the block records, let's check whether any full
      // containers have any extra space (left behind after a crash or from an
      // older version of Kudu).
      //
      // Filesystems are unpredictable beasts and may misreport the amount of
      // space allocated to a file in various interesting ways. Some examples:
      // - XFS's speculative preallocation feature may artificially enlarge the
      //   container's data file without updating its file size. This makes the
      //   file size untrustworthy for the purposes of measuring allocated space.
      //   See KUDU-1856 for more details.
      // - On el6.6/ext4 a container data file that consumed ~32K according to
      //   its extent tree was actually reported as consuming an additional fs
      //   block (2k) of disk space. A similar container data file (generated
      //   via the same workload) on Ubuntu 16.04/ext4 did not exhibit this.
      //   The suspicion is that older versions of ext4 include interior nodes
      //   of the extent tree when reporting file block usage.
      //
      // To deal with these issues, our extra space cleanup code (deleted block
      // repunching and container truncation) is gated on an "actual disk space
      // consumed" heuristic. To prevent unnecessary triggering of the
      // heuristic, we allow for some slop in our size measurements. The exact
      // amount of slop is configurable via
      // log_container_excess_space_before_cleanup_fraction.
      //
      // Too little slop and we'll do unnecessary work at startup. Too much and
      // more unused space may go unreclaimed.
      string data_filename = StrCat(container->ToString(), kContainerDataFileSuffix);
      uint64_t reported_size;
      s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
      if (!s.ok()) {
        HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
            ErrorHandlerType::DISK_ERROR, dir));
        *result_status = s.CloneAndPrepend(Substitute(
            "Could not get on-disk file size of container $0", container->ToString()));
        return;
      }
      int64_t cleanup_threshold_size = container->live_bytes_aligned() *
          (1 + FLAGS_log_container_excess_space_before_cleanup_fraction);
      if (reported_size > cleanup_threshold_size) {
        local_report.full_container_space_check->entries.emplace_back(
            container->ToString(), reported_size - container->live_bytes_aligned());

        // If the container is to be deleted outright, don't bother repunching
        // its blocks. The report entry remains, however, so it's clear that
        // there was a space discrepancy.
        if (container->live_blocks()) {
          need_repunching.insert(need_repunching.end(),
                                 dead_blocks.begin(), dead_blocks.end());
        }
      }

      local_report.stats.lbm_full_container_count++;
    }
    local_report.stats.live_block_bytes += container->live_bytes();
    local_report.stats.live_block_bytes_aligned += container->live_bytes_aligned();
    local_report.stats.live_block_count += container->live_blocks();
    local_report.stats.lbm_container_count++;

    // Log number of containers opened every 10 seconds
    MonoTime now = MonoTime::Now();
    if ((now - last_opened_container_log_time).ToSeconds() > 10) {
      LOG(INFO) << Substitute("Opened $0 log block containers in $1",
                              local_report.stats.lbm_container_count, dir->dir());
      last_opened_container_log_time = now;
    }

    next_block_id_.StoreMax(max_block_id + 1);

    // Under the lock, merge this map into the main block map and add
    // the container.
    {
      std::lock_guard<simple_spinlock> l(lock_);
      // To avoid cacheline contention during startup, we aggregate all of the
      // memory in a local and add it to the mem-tracker in a single increment
      // at the end of this loop.
      int64_t mem_usage = 0;
      for (UntrackedBlockMap::value_type& e : live_blocks) {
        int block_mem = kudu_malloc_usable_size(e.second.get());
        if (!AddLogBlockUnlocked(std::move(e.second))) {
          // TODO(adar): track as an inconsistency?
          LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
                     << " which already is alive from another container when "
                     << " processing container " << container->ToString();
        }
        mem_usage += block_mem;
      }

      mem_tracker_->Consume(mem_usage);
      AddNewContainerUnlocked(container.get());
      MakeContainerAvailableUnlocked(container.release());
    }
  }

  // Like the rest of Open(), repairs are performed per data directory to take
  // advantage of parallelism.
  s = Repair(dir,
             &local_report,
             std::move(need_repunching),
             std::move(dead_containers),
             std::move(low_live_block_containers));
  if (!s.ok()) {
    *result_status = s.CloneAndPrepend(Substitute(
        "fatal error while repairing inconsistencies in data directory $0",
        dir->dir()));
    return;
  }

  *report = std::move(local_report);
  *result_status = Status::OK();
}