in src/kudu/tools/tool_action_hms.cc [276:371]
Status AnalyzeCatalogs(const string& master_addrs,
HmsCatalog* hms_catalog,
KuduClient* kudu_client,
CatalogReport* report) {
// Step 1: retrieve all Kudu tables, and aggregate them by ID and by name. The
// by-ID map will be used to match the HMS Kudu table entries. The by-name map
// will be used to match against legacy Impala/Kudu HMS table entries.
unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_id;
unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_name;
{
vector<string> kudu_table_names;
RETURN_NOT_OK(kudu_client->ListTables(&kudu_table_names));
for (const string& kudu_table_name : kudu_table_names) {
shared_ptr<KuduTable> kudu_table;
// TODO(dan): When the error is NotFound, prepend an admonishment about not
// running this tool when the catalog is in-flux.
RETURN_NOT_OK(kudu_client->OpenTable(kudu_table_name, &kudu_table));
kudu_tables_by_id.emplace(kudu_table->id(), kudu_table);
kudu_tables_by_name.emplace(kudu_table->name(), std::move(kudu_table));
}
}
// Step 2: retrieve all Kudu table entries in the HMS, filter all orphaned
// entries which reference non-existent Kudu tables, and group the rest by
// table ID.
vector<hive::Table> orphan_tables;
unordered_map<string, vector<hive::Table>> hms_tables_by_id;
{
vector<hive::Table> hms_tables;
RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));
for (hive::Table& hms_table : hms_tables) {
const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
if (storage_handler == HmsClient::kKuduStorageHandler) {
const string& hms_table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_id, hms_table_id);
if (kudu_table) {
hms_tables_by_id[(*kudu_table)->id()].emplace_back(std::move(hms_table));
} else {
orphan_tables.emplace_back(std::move(hms_table));
}
} else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
const string& hms_table_name = hms_table.parameters[HmsClient::kLegacyKuduTableNameKey];
shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name);
if (kudu_table) {
hms_tables_by_id[(*kudu_table)->id()].emplace_back(std::move(hms_table));
} else {
orphan_tables.emplace_back(std::move(hms_table));
}
}
}
}
// Step 3: Determine the state of each Kudu table's HMS entry(ies), and bin
// them appropriately.
vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_tables;
vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_tables;
vector<pair<shared_ptr<KuduTable>, hive::Table>> stale_tables;
vector<shared_ptr<KuduTable>> missing_tables;
vector<shared_ptr<KuduTable>> invalid_name_tables;
for (auto& kudu_table_pair : kudu_tables_by_id) {
shared_ptr<KuduTable> kudu_table = kudu_table_pair.second;
vector<hive::Table>* hms_tables = FindOrNull(hms_tables_by_id, kudu_table_pair.first);
if (!hms_tables) {
const string& table_name = kudu_table->name();
string normalized_table_name(table_name.data(), table_name.size());
Status s = hms::HmsCatalog::NormalizeTableName(&normalized_table_name);
if (!s.ok()) {
invalid_name_tables.emplace_back(std::move(kudu_table));
} else {
missing_tables.emplace_back(std::move(kudu_table));
}
} else if (hms_tables->size() == 1) {
hive::Table& hms_table = (*hms_tables)[0];
const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
if (storage_handler == HmsClient::kKuduStorageHandler &&
!IsSynced(master_addrs, *kudu_table, hms_table)) {
stale_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
} else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
legacy_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
}
} else {
for (hive::Table& hms_table : *hms_tables) {
duplicate_tables.emplace_back(make_pair(kudu_table, std::move(hms_table)));
}
}
}
report->orphan_hms_tables.swap(orphan_tables);
report->missing_hms_tables.swap(missing_tables);
report->invalid_name_tables.swap(invalid_name_tables);
report->legacy_hms_tables.swap(legacy_tables);
report->duplicate_hms_tables.swap(duplicate_tables);
report->inconsistent_tables.swap(stale_tables);
return Status::OK();
}