Status AnalyzeCatalogs()

in src/kudu/tools/tool_action_hms.cc [377:537]


Status AnalyzeCatalogs(const string& master_addrs,
                       HmsCatalog* hms_catalog,
                       KuduClient* kudu_client,
                       CatalogReport* report,
                       int* kudu_catalog_count = nullptr,
                       int* hms_catalog_count = nullptr) {
  // Step 1: retrieve all Kudu tables, and aggregate them by ID and by name. The
  // by-ID map will be used to match the HMS Kudu table entries. The by-name map
  // will be used to match against legacy Impala/Kudu HMS table entries.
  unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_id;
  unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_name;
  {
    vector<string> kudu_table_names;
    RETURN_NOT_OK(kudu_client->ListTables(&kudu_table_names));
    if (kudu_catalog_count) {
      *kudu_catalog_count = kudu_table_names.size();
    }
    for (const string& kudu_table_name : kudu_table_names) {
      shared_ptr<KuduTable> kudu_table;
      // TODO(dan): When the error is NotFound, prepend an admonishment about not
      // running this tool when the catalog is in-flux.
      RETURN_NOT_OK(kudu_client->OpenTable(kudu_table_name, &kudu_table));
      kudu_tables_by_id.emplace(kudu_table->id(), kudu_table);
      kudu_tables_by_name.emplace(kudu_table->name(), std::move(kudu_table));
    }
  }

  // Step 2: retrieve all Kudu table entries in the HMS, filter all orphaned
  // entries which reference non-existent Kudu tables, and group the rest by
  // table type and table ID.
  const set<string> kudu_master_addrs = Split(master_addrs, ",");
  vector<hive::Table> orphan_hms_tables;
  unordered_map<string, vector<hive::Table>> synchronized_hms_tables_by_id;
  unordered_map<string, vector<hive::Table>> external_hms_tables_by_id;
  {
    vector<hive::Table> hms_tables;
    RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));
    if (hms_catalog_count) {
      *hms_catalog_count = hms_tables.size();
    }
    for (hive::Table& hms_table : hms_tables) {
      // If the addresses in the HMS entry don't overlap at all with the
      // expected addresses, the entry is likely from another Kudu cluster.
      // Ignore it if appropriate.
      if (FLAGS_ignore_other_clusters) {
        const string* hms_masters_field = FindOrNull(hms_table.parameters,
                                                    HmsClient::kKuduMasterAddrsKey);
        vector<string> master_intersection;
        if (hms_masters_field) {
          const set<string> hms_master_addrs = Split(*hms_masters_field, ",");
          std::set_intersection(hms_master_addrs.begin(), hms_master_addrs.end(),
                                kudu_master_addrs.begin(), kudu_master_addrs.end(),
                                std::back_inserter(master_intersection));
        }
        if (master_intersection.empty()) {
          LOG(INFO) << Substitute("Skipping HMS table $0.$1 with different "
              "masters specified: $2", hms_table.dbName, hms_table.tableName,
              hms_masters_field ? *hms_masters_field : "<none>");
          continue;
        }
      }

      // If this is a non-legacy, synchronized table, we expect a table ID to exist
      // in the HMS entry; look up the Kudu table by ID. Otherwise, look it up
      // by table name.
      shared_ptr<KuduTable>* kudu_table;
      const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
      const string& hms_table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
      const string& hms_table_name = hms_table.parameters[HmsClient::kKuduTableNameKey];
      if (storage_handler == HmsClient::kKuduStorageHandler &&
          HmsClient::IsSynchronized(hms_table)) {
        kudu_table = FindOrNull(kudu_tables_by_id, hms_table_id);
        // If there is no kudu table that matches the id, or the id doesn't exist,
        // lookup the table by name. This handles synchronized tables created when HMS
        // sync was off or tables with IDs out of sync.
        if (!kudu_table) {
          kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name);
        }
      } else {
        kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name);
      }

      if (kudu_table) {
        if (HmsClient::IsSynchronized(hms_table)) {
          synchronized_hms_tables_by_id[(*kudu_table)->id()].emplace_back(
              std::move(hms_table));
        } else if (hms_table.tableType == HmsClient::kExternalTable) {
          external_hms_tables_by_id[(*kudu_table)->id()].emplace_back(
              std::move(hms_table));
        }
      } else if (HmsClient::IsSynchronized(hms_table)) {
        // Note: we only consider synchronized HMS table entries "orphans"
        // because unsynchronized external tables don't always point at valid
        // Kudu tables.
        orphan_hms_tables.emplace_back(std::move(hms_table));
      }
    }
  }

  // Step 3: Determine the state of each Kudu table's HMS entry(ies), and bin
  // them appropriately.
  vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_tables;
  vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_tables;
  vector<pair<shared_ptr<KuduTable>, hive::Table>> stale_tables;
  vector<shared_ptr<KuduTable>> missing_tables;
  vector<shared_ptr<KuduTable>> invalid_name_tables;
  for (auto& kudu_table_pair : kudu_tables_by_id) {
    shared_ptr<KuduTable> kudu_table = kudu_table_pair.second;
    // Check all of the synchronized HMS tables.
    vector<hive::Table>* hms_tables = FindOrNull(synchronized_hms_tables_by_id,
                                                 kudu_table_pair.first);
    // If the there are no synchronized HMS table entries, this table is missing
    // HMS tables and might have an invalid table name.
    if (!hms_tables) {
      const string& table_name = kudu_table->name();
      string normalized_table_name(table_name.data(), table_name.size());
      Status s = hms::HmsCatalog::NormalizeTableName(&normalized_table_name);
      if (!s.ok()) {
        invalid_name_tables.emplace_back(std::move(kudu_table));
      } else {
        missing_tables.emplace_back(std::move(kudu_table));
      }
    // If there is a single synchronized HMS table, this table could be unsynced or
    // using the legacy handler.
    } else if (hms_tables->size() == 1) {
      hive::Table& hms_table = (*hms_tables)[0];
      const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
      if (storage_handler == HmsClient::kKuduStorageHandler &&
          !IsSynced(kudu_master_addrs, *kudu_table, hms_table)) {
        stale_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
      } else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
        legacy_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
      }
    // Otherwise, there are multiple synchronized HMS tables for a single Kudu table.
    } else {
      for (hive::Table& hms_table : *hms_tables) {
        duplicate_tables.emplace_back(make_pair(kudu_table, std::move(hms_table)));
      }
    }
  }

  // Check all of the external HMS tables to see if they are using the legacy handler.
  for (auto& hms_table_pair : external_hms_tables_by_id) {
    shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_id, hms_table_pair.first);
    if (kudu_table) {
      for (hive::Table &hms_table : hms_table_pair.second) {
        const string &storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
        if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
          legacy_tables.emplace_back(make_pair(std::move(*kudu_table), std::move(hms_table)));
        }
      }
    }
  }
  report->orphan_hms_tables.swap(orphan_hms_tables);
  report->missing_hms_tables.swap(missing_tables);
  report->invalid_name_tables.swap(invalid_name_tables);
  report->inconsistent_tables.swap(stale_tables);
  report->legacy_hms_tables.swap(legacy_tables);
  report->duplicate_hms_tables.swap(duplicate_tables);
  return Status::OK();
}