Status AnalyzeCatalogs()

in src/kudu/tools/tool_action_hms.cc [276:371]


Status AnalyzeCatalogs(const string& master_addrs,
                       HmsCatalog* hms_catalog,
                       KuduClient* kudu_client,
                       CatalogReport* report) {
  // Step 1: retrieve all Kudu tables, and aggregate them by ID and by name. The
  // by-ID map will be used to match the HMS Kudu table entries. The by-name map
  // will be used to match against legacy Impala/Kudu HMS table entries.
  unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_id;
  unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_name;
  {
    vector<string> kudu_table_names;
    RETURN_NOT_OK(kudu_client->ListTables(&kudu_table_names));
    for (const string& kudu_table_name : kudu_table_names) {
      shared_ptr<KuduTable> kudu_table;
      // TODO(dan): When the error is NotFound, prepend an admonishment about not
      // running this tool when the catalog is in-flux.
      RETURN_NOT_OK(kudu_client->OpenTable(kudu_table_name, &kudu_table));
      kudu_tables_by_id.emplace(kudu_table->id(), kudu_table);
      kudu_tables_by_name.emplace(kudu_table->name(), std::move(kudu_table));
    }
  }

  // Step 2: retrieve all Kudu table entries in the HMS, filter all orphaned
  // entries which reference non-existent Kudu tables, and group the rest by
  // table ID.
  vector<hive::Table> orphan_tables;
  unordered_map<string, vector<hive::Table>> hms_tables_by_id;
  {
    vector<hive::Table> hms_tables;
    RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));
    for (hive::Table& hms_table : hms_tables) {
      const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
      if (storage_handler == HmsClient::kKuduStorageHandler) {
        const string& hms_table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
        shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_id, hms_table_id);
        if (kudu_table) {
          hms_tables_by_id[(*kudu_table)->id()].emplace_back(std::move(hms_table));
        } else {
          orphan_tables.emplace_back(std::move(hms_table));
        }
      } else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
        const string& hms_table_name = hms_table.parameters[HmsClient::kLegacyKuduTableNameKey];
        shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name);
        if (kudu_table) {
          hms_tables_by_id[(*kudu_table)->id()].emplace_back(std::move(hms_table));
        } else {
          orphan_tables.emplace_back(std::move(hms_table));
        }
      }
    }
  }

  // Step 3: Determine the state of each Kudu table's HMS entry(ies), and bin
  // them appropriately.
  vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_tables;
  vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_tables;
  vector<pair<shared_ptr<KuduTable>, hive::Table>> stale_tables;
  vector<shared_ptr<KuduTable>> missing_tables;
  vector<shared_ptr<KuduTable>> invalid_name_tables;
  for (auto& kudu_table_pair : kudu_tables_by_id) {
    shared_ptr<KuduTable> kudu_table = kudu_table_pair.second;
    vector<hive::Table>* hms_tables = FindOrNull(hms_tables_by_id, kudu_table_pair.first);

    if (!hms_tables) {
      const string& table_name = kudu_table->name();
      string normalized_table_name(table_name.data(), table_name.size());
      Status s = hms::HmsCatalog::NormalizeTableName(&normalized_table_name);
      if (!s.ok()) {
        invalid_name_tables.emplace_back(std::move(kudu_table));
      } else {
        missing_tables.emplace_back(std::move(kudu_table));
      }
    } else if (hms_tables->size() == 1) {
      hive::Table& hms_table = (*hms_tables)[0];
      const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
      if (storage_handler == HmsClient::kKuduStorageHandler &&
          !IsSynced(master_addrs, *kudu_table, hms_table)) {
        stale_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
      } else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
        legacy_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
      }
    } else {
      for (hive::Table& hms_table : *hms_tables) {
        duplicate_tables.emplace_back(make_pair(kudu_table, std::move(hms_table)));
      }
    }
  }

  report->orphan_hms_tables.swap(orphan_tables);
  report->missing_hms_tables.swap(missing_tables);
  report->invalid_name_tables.swap(invalid_name_tables);
  report->legacy_hms_tables.swap(legacy_tables);
  report->duplicate_hms_tables.swap(duplicate_tables);
  report->inconsistent_tables.swap(stale_tables);
  return Status::OK();
}