Status LogBlockManager::Repair()

in src/kudu/fs/log_block_manager.cc [2441:2679]


Status LogBlockManager::Repair(
    DataDir* dir,
    FsReport* report,
    vector<scoped_refptr<internal::LogBlock>> need_repunching,
    vector<string> dead_containers,
    unordered_map<string, vector<BlockRecordPB>> low_live_block_containers) {
  if (opts_.read_only) {
    LOG(INFO) << "Read-only block manager, skipping repair";
    return Status::OK();
  }
  if (report->HasFatalErrors()) {
    LOG(WARNING) << "Found fatal and irreparable errors, skipping repair";
    return Status::OK();
  }

  // From here on out we're committed to repairing.

  // Fetch all the containers we're going to need.
  unordered_map<std::string, internal::LogBlockContainer*> containers_by_name;
  {
    std::lock_guard<simple_spinlock> l(lock_);

    // Remove all of the dead containers from the block manager. They will be
    // deleted from disk shortly thereafter, outside of the lock.
    for (const auto& d : dead_containers) {
      RemoveFullContainerUnlocked(d);
    }

    // Fetch all the containers we're going to need.
    if (report->partial_record_check) {
      for (const auto& pr : report->partial_record_check->entries) {
        LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
                                             pr.container);
        if (c) {
          containers_by_name[pr.container] = c;
        }
      }
    }
    if (report->full_container_space_check) {
      for (const auto& fcp : report->full_container_space_check->entries) {
        LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
                                             fcp.container);
        if (c) {
          containers_by_name[fcp.container] = c;
        }
      }
    }
    for (const auto& e : low_live_block_containers) {
      LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
                                           e.first);
      if (c) {
        containers_by_name[e.first] = c;
      }
    }
  }


  // Delete all dead containers.
  //
  // After the deletions, the data directory is sync'ed to reduce the chance
  // of a data file existing without its corresponding metadata file (or vice
  // versa) in the event of a crash. The block manager would treat such a case
  // as corruption and require manual intervention.
  //
  // TODO(adar) the above is not fool-proof; a crash could manifest in between
  // any pair of deletions. That said, the odds of it happening are incredibly
  // rare, and manual resolution isn't hard (just delete the existing file).
  int64_t deleted_metadata_bytes = 0;
  for (const auto& d : dead_containers) {
    string data_file_name = StrCat(d, kContainerDataFileSuffix);
    string metadata_file_name = StrCat(d, kContainerMetadataFileSuffix);

    uint64_t metadata_size;
    Status s = env_->GetFileSize(metadata_file_name, &metadata_size);
    if (s.ok()) {
      deleted_metadata_bytes += metadata_size;
    } else {
      WARN_NOT_OK_LBM_DISK_FAILURE(s,
          "Could not get size of dead container metadata file " + metadata_file_name);
    }

    WARN_NOT_OK_LBM_DISK_FAILURE(file_cache_.DeleteFile(data_file_name),
                "Could not delete dead container data file " + data_file_name);
    WARN_NOT_OK_LBM_DISK_FAILURE(file_cache_.DeleteFile(metadata_file_name),
                "Could not delete dead container metadata file " + metadata_file_name);
  }
  if (!dead_containers.empty()) {
    WARN_NOT_OK_LBM_DISK_FAILURE(env_->SyncDir(dir->dir()), "Could not sync data directory");
    LOG(INFO) << Substitute("Deleted $0 dead containers ($1 metadata bytes)",
                            dead_containers.size(), deleted_metadata_bytes);
  }

  // Truncate partial metadata records.
  //
  // This is a fatal inconsistency; if the repair fails, we cannot proceed.
  if (report->partial_record_check) {
    for (auto& pr : report->partial_record_check->entries) {
      unique_ptr<RWFile> file;
      RWFileOptions opts;
      opts.mode = Env::OPEN_EXISTING;
      internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
                                                             pr.container);
      if (!container) {
        // The container was deleted outright.
        pr.repaired = true;
        continue;
      }
      RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(
          env_->NewRWFile(opts,
                          StrCat(pr.container, kContainerMetadataFileSuffix),
                          &file),
          "could not reopen container to truncate partial metadata record");

      RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(file->Truncate(pr.offset),
          "could not truncate partial metadata record");

      // Technically we've "repaired" the inconsistency if the truncation
      // succeeded, even if the following logic fails.
      pr.repaired = true;

      RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(file->Close(),
          "could not close container after truncating partial metadata record");

      // Reopen the PB writer so that it will refresh its metadata about the
      // underlying file and resume appending to the new end of the file.
      RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(container->ReopenMetadataWriter(),
          "could not reopen container metadata file");
    }
  }

  // Delete any incomplete container files.
  //
  // This is a non-fatal inconsistency; we can just as easily ignore the
  // leftover container files.
  if (report->incomplete_container_check) {
    for (auto& ic : report->incomplete_container_check->entries) {
      Status s = env_->DeleteFile(
          StrCat(ic.container, kContainerMetadataFileSuffix));
      if (!s.ok() && !s.IsNotFound()) {
        WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container metadata file");
      }

      s = env_->DeleteFile(StrCat(ic.container, kContainerDataFileSuffix));
      if (!s.ok() && !s.IsNotFound()) {
        WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container data file");
      }
      ic.repaired = true;
    }
  }

  // Truncate any excess preallocated space in full containers.
  //
  // This is a non-fatal inconsistency; we can just as easily ignore the extra
  // disk space consumption.
  if (report->full_container_space_check) {
    for (auto& fcp : report->full_container_space_check->entries) {
      internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
                                                             fcp.container);
      if (!container) {
        // The container was deleted outright.
        fcp.repaired = true;
        continue;
      }

      Status s = container->TruncateDataToNextBlockOffset();
      if (s.ok()) {
        fcp.repaired = true;
      }
      WARN_NOT_OK(s, "could not truncate excess preallocated space");
    }
  }

  // Repunch all requested holes. Any excess space reclaimed was already
  // tracked by LBMFullContainerSpaceCheck.
  //
  // Register deletions to a single BlockDeletionTransaction. So, the repunched
  // holes belonging to the same container can be coalesced.
  shared_ptr<LogBlockDeletionTransaction> transaction =
      std::make_shared<LogBlockDeletionTransaction>(this);
  for (const auto& b : need_repunching) {
    b->RegisterDeletion(transaction);
    transaction->AddBlock(b);
  }

  // Clearing this vector drops the last references to the LogBlocks within,
  // triggering the repunching operations.
  need_repunching.clear();

  // "Compact" metadata files with few live blocks by rewriting them with only
  // the live block records.
  int64_t metadata_files_compacted = 0;
  int64_t metadata_bytes_delta = 0;
  for (const auto& e : low_live_block_containers) {
    internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
                                                           e.first);
    if (!container) {
      // The container was deleted outright.
      continue;
    }

    // Rewrite this metadata file. Failures are non-fatal.
    int64_t file_bytes_delta;
    const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix);
    Status s = RewriteMetadataFile(*container, e.second, &file_bytes_delta);
    if (!s.ok()) {
      WARN_NOT_OK(s, "could not rewrite metadata file");
      continue;
    }

    // However, we're hosed if we can't open the new metadata file.
    RETURN_NOT_OK_PREPEND(container->ReopenMetadataWriter(),
                          "could not reopen new metadata file");

    metadata_files_compacted++;
    metadata_bytes_delta += file_bytes_delta;
    VLOG(1) << "Compacted metadata file " << meta_path
            << " (saved " << file_bytes_delta << " bytes)";

  }

  // The data directory can be synchronized once for all of the new metadata files.
  //
  // Non-disk failures are fatal: if a new metadata file doesn't durably exist
  // in the data directory, it would be unsafe to append new block records to
  // it. This is because after a crash the old metadata file may appear
  // instead, and that file lacks the newly appended block records.
  //
  // TODO(awong): The below will only be true with persistent disk states.
  // Disk failures do not suffer from this issue because, on the next startup,
  // the entire directory will not be used.
  if (metadata_files_compacted > 0) {
    Status s = env_->SyncDir(dir->dir());
    RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(s, "Could not sync data directory");
    LOG(INFO) << Substitute("Compacted $0 metadata files ($1 metadata bytes)",
                            metadata_files_compacted, metadata_bytes_delta);
  }

  return Status::OK();
}