Status DataDirManager::Open()

in src/kudu/fs/data_dirs.cc [643:873]


Status DataDirManager::Open() {
  const int kMaxDataDirs = opts_.block_manager_type == "file" ? (1 << 16) - 1 : kint32max;

  // Find and load existing data directory instances.
  vector<unique_ptr<PathInstanceMetadataFile>> loaded_instances;
  RETURN_NOT_OK(LoadInstances(&loaded_instances));

  // Add new or remove existing data directories, if desired.
  if (opts_.consistency_check == ConsistencyCheckBehavior::UPDATE_ON_DISK) {
    if (opts_.block_manager_type == "file") {
      return Status::InvalidArgument(
          "file block manager may not add or remove data directories");
    }

    // Prepare to create new directories and update existing instances. We
    // must generate a new UUID for each missing root, and update all_uuids in
    // all existing instances to include those new UUIDs.
    //
    // Note: all data directories must be healthy to perform this operation.
    ObjectIdGenerator gen;
    vector<string> new_all_uuids;
    vector<pair<string, string>> root_uuid_pairs_to_create;
    for (const auto& i : loaded_instances) {
      if (i->health_status().IsNotFound()) {
        string uuid = gen.Next();
        new_all_uuids.emplace_back(uuid);
        root_uuid_pairs_to_create.emplace_back(DirName(i->dir()), std::move(uuid));
        continue;
      }
      RETURN_NOT_OK_PREPEND(
          i->health_status(),
          "found failed data directory while adding new data directories");
      new_all_uuids.emplace_back(i->metadata()->path_set().uuid());
    }
    RETURN_NOT_OK_PREPEND(
        CreateNewDataDirectoriesAndUpdateInstances(
            std::move(root_uuid_pairs_to_create),
            std::move(loaded_instances),
            std::move(new_all_uuids)),
            "could not add new data directories");

    // Now that we've created the missing directories, try loading the
    // directories again.
    //
    // Note: 'loaded_instances' must be cleared to unlock the instance files.
    loaded_instances.clear();
    RETURN_NOT_OK(LoadInstances(&loaded_instances));
    for (const auto& i : loaded_instances) {
      RETURN_NOT_OK_PREPEND(i->health_status(),
          "found failed data directory after updating data directories");
    }
  }

  // Check the integrity of all loaded instances.
  if (opts_.consistency_check != ConsistencyCheckBehavior::IGNORE_INCONSISTENCY) {
    RETURN_NOT_OK_PREPEND(
        PathInstanceMetadataFile::CheckIntegrity(loaded_instances),
        Substitute("could not verify integrity of files: $0",
                   JoinStrings(GetDataDirs(), ",")));
  }

  // All instances are present and accounted for. Time to create the in-memory
  // data directory structures.
  int i = 0;
  vector<unique_ptr<DataDir>> dds;
  for (auto& instance : loaded_instances) {
    const string data_dir = instance->dir();

    // Create a per-dir thread pool.
    gscoped_ptr<ThreadPool> pool;
    RETURN_NOT_OK(ThreadPoolBuilder(Substitute("data dir $0", i))
                  .set_max_threads(1)
                  .set_trace_metric_prefix("data dirs")
                  .Build(&pool));

    // Figure out what filesystem the data directory is on.
    DataDirFsType fs_type = DataDirFsType::OTHER;
    if (instance->healthy()) {
      bool result;
      RETURN_NOT_OK(env_->IsOnExtFilesystem(data_dir, &result));
      if (result) {
        fs_type = DataDirFsType::EXT;
      } else {
        RETURN_NOT_OK(env_->IsOnXfsFilesystem(data_dir, &result));
        if (result) {
          fs_type = DataDirFsType::XFS;
        }
      }
    }

    unique_ptr<DataDir> dd(new DataDir(
        env_, metrics_.get(), fs_type, data_dir, std::move(instance),
        unique_ptr<ThreadPool>(pool.release())));
    dds.emplace_back(std::move(dd));
    i++;
  }

  // Use the per-dir thread pools to delete temporary files in parallel.
  for (const auto& dd : dds) {
    if (dd->instance()->healthy()) {
      dd->ExecClosure(Bind(&DeleteTmpFilesRecursively, env_, dd->dir()));
    }
  }
  for (const auto& dd : dds) {
    dd->WaitOnClosures();
  }

  // Build in-memory maps of on-disk state.
  UuidByRootMap uuid_by_root;
  UuidByUuidIndexMap uuid_by_idx;
  UuidIndexByUuidMap idx_by_uuid;
  UuidIndexMap dd_by_uuid_idx;
  ReverseUuidIndexMap uuid_idx_by_dd;
  TabletsByUuidIndexMap tablets_by_uuid_idx_map;
  FailedDataDirSet failed_data_dirs;

  const auto insert_to_maps = [&] (int idx, string uuid, DataDir* dd) {
    InsertOrDie(&uuid_by_root, DirName(dd->dir()), uuid);
    InsertOrDie(&uuid_by_idx, idx, uuid);
    InsertOrDie(&idx_by_uuid, uuid, idx);
    InsertOrDie(&dd_by_uuid_idx, idx, dd);
    InsertOrDie(&uuid_idx_by_dd, dd, idx);
    InsertOrDie(&tablets_by_uuid_idx_map, idx, {});
  };

  if (opts_.consistency_check != ConsistencyCheckBehavior::IGNORE_INCONSISTENCY) {
    // If we're not in IGNORE_INCONSISTENCY mode, we're guaranteed that the
    // healthy instances match from the above integrity check, so we can assign
    // each healthy directory a UUID in accordance with its instance file.
    //
    // A directory may not have been assigned a UUID because its instance file
    // could not be read, in which case, we track it and assign a UUID to it
    // later if we can.
    vector<DataDir*> unassigned_dirs;
    int first_healthy = -1;
    for (int dir = 0; dir < dds.size(); dir++) {
      const auto& dd = dds[dir];
      if (PREDICT_FALSE(!dd->instance()->healthy())) {
        // Keep track of failed directories so we can assign them UUIDs later.
        unassigned_dirs.push_back(dd.get());
        continue;
      }
      if (first_healthy == -1) {
        first_healthy = dir;
      }
      const PathSetPB& path_set = dd->instance()->metadata()->path_set();
      int idx = -1;
      for (int i = 0; i < path_set.all_uuids_size(); i++) {
        if (path_set.uuid() == path_set.all_uuids(i)) {
          idx = i;
          break;
        }
      }
      if (idx == -1) {
        return Status::IOError(Substitute(
            "corrupt path set for data directory $0: uuid $1 not found in path set",
            dd->dir(), path_set.uuid()));
      }
      if (idx > kMaxDataDirs) {
        return Status::NotSupported(
            Substitute("block manager supports a maximum of $0 paths", kMaxDataDirs));
      }
      insert_to_maps(idx, path_set.uuid(), dd.get());
    }
    CHECK_NE(first_healthy, -1); // Guaranteed by LoadInstances().

    // If the uuid index was not assigned, assign it to a failed directory. Use
    // the path set from the first healthy instance.
    PathSetPB path_set = dds[first_healthy]->instance()->metadata()->path_set();
    int failed_dir_idx = 0;
    for (int uuid_idx = 0; uuid_idx < path_set.all_uuids_size(); uuid_idx++) {
      if (!ContainsKey(uuid_by_idx, uuid_idx)) {
        const string& unassigned_uuid = path_set.all_uuids(uuid_idx);
        insert_to_maps(uuid_idx, unassigned_uuid, unassigned_dirs[failed_dir_idx]);

        // Record the directory as failed.
        if (metrics_) {
          metrics_->data_dirs_failed->IncrementBy(1);
        }
        InsertOrDie(&failed_data_dirs, uuid_idx);
        failed_dir_idx++;
      }
    }
    CHECK_EQ(unassigned_dirs.size(), failed_dir_idx);
  } else {
    // If we are in IGNORE_INCONSISTENCY mode, all bets are off. The most we
    // can do is make a best effort assignment of data dirs to UUIDs based on
    // the ones that are healthy, and for the sake of completeness, assign
    // artificial UUIDs to the unhealthy ones.
    for (int dir = 0; dir < dds.size(); dir++) {
      DataDir* dd = dds[dir].get();
      if (dd->instance()->healthy()) {
        insert_to_maps(dir, dd->instance()->metadata()->path_set().uuid(), dd);
      } else {
        insert_to_maps(dir, Substitute("<unknown uuid $0>", dir), dd);
        InsertOrDie(&failed_data_dirs, dir);
      }
    }
  }

  data_dirs_.swap(dds);
  uuid_by_idx_.swap(uuid_by_idx);
  idx_by_uuid_.swap(idx_by_uuid);
  data_dir_by_uuid_idx_.swap(dd_by_uuid_idx);
  uuid_idx_by_data_dir_.swap(uuid_idx_by_dd);
  tablets_by_uuid_idx_map_.swap(tablets_by_uuid_idx_map);
  failed_data_dirs_.swap(failed_data_dirs);
  uuid_by_root_.swap(uuid_by_root);

  // From this point onwards, the above in-memory maps must be consistent with
  // the main path set.

  // Initialize the 'fullness' status of the data directories.
  for (const auto& dd : data_dirs_) {
    int uuid_idx;
    CHECK(FindUuidIndexByDataDir(dd.get(), &uuid_idx));
    if (ContainsKey(failed_data_dirs_, uuid_idx)) {
      continue;
    }
    Status refresh_status = dd->RefreshIsFull(DataDir::RefreshMode::ALWAYS);
    if (PREDICT_FALSE(!refresh_status.ok())) {
      if (refresh_status.IsDiskFailure()) {
        RETURN_NOT_OK(MarkDataDirFailed(uuid_idx, refresh_status.ToString()));
        continue;
      }
      return refresh_status;
    }
  }

  return Status::OK();
}