in src/kudu/tools/tool_action_hms.cc [377:537]
Status AnalyzeCatalogs(const string& master_addrs,
HmsCatalog* hms_catalog,
KuduClient* kudu_client,
CatalogReport* report,
int* kudu_catalog_count = nullptr,
int* hms_catalog_count = nullptr) {
// Step 1: retrieve all Kudu tables, and aggregate them by ID and by name. The
// by-ID map will be used to match the HMS Kudu table entries. The by-name map
// will be used to match against legacy Impala/Kudu HMS table entries.
unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_id;
unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_name;
{
vector<string> kudu_table_names;
RETURN_NOT_OK(kudu_client->ListTables(&kudu_table_names));
if (kudu_catalog_count) {
*kudu_catalog_count = kudu_table_names.size();
}
for (const string& kudu_table_name : kudu_table_names) {
shared_ptr<KuduTable> kudu_table;
// TODO(dan): When the error is NotFound, prepend an admonishment about not
// running this tool when the catalog is in-flux.
RETURN_NOT_OK(kudu_client->OpenTable(kudu_table_name, &kudu_table));
kudu_tables_by_id.emplace(kudu_table->id(), kudu_table);
kudu_tables_by_name.emplace(kudu_table->name(), std::move(kudu_table));
}
}
// Step 2: retrieve all Kudu table entries in the HMS, filter all orphaned
// entries which reference non-existent Kudu tables, and group the rest by
// table type and table ID.
const set<string> kudu_master_addrs = Split(master_addrs, ",");
vector<hive::Table> orphan_hms_tables;
unordered_map<string, vector<hive::Table>> synchronized_hms_tables_by_id;
unordered_map<string, vector<hive::Table>> external_hms_tables_by_id;
{
vector<hive::Table> hms_tables;
RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));
if (hms_catalog_count) {
*hms_catalog_count = hms_tables.size();
}
for (hive::Table& hms_table : hms_tables) {
// If the addresses in the HMS entry don't overlap at all with the
// expected addresses, the entry is likely from another Kudu cluster.
// Ignore it if appropriate.
if (FLAGS_ignore_other_clusters) {
const string* hms_masters_field = FindOrNull(hms_table.parameters,
HmsClient::kKuduMasterAddrsKey);
vector<string> master_intersection;
if (hms_masters_field) {
const set<string> hms_master_addrs = Split(*hms_masters_field, ",");
std::set_intersection(hms_master_addrs.begin(), hms_master_addrs.end(),
kudu_master_addrs.begin(), kudu_master_addrs.end(),
std::back_inserter(master_intersection));
}
if (master_intersection.empty()) {
LOG(INFO) << Substitute("Skipping HMS table $0.$1 with different "
"masters specified: $2", hms_table.dbName, hms_table.tableName,
hms_masters_field ? *hms_masters_field : "<none>");
continue;
}
}
// If this is a non-legacy, synchronized table, we expect a table ID to exist
// in the HMS entry; look up the Kudu table by ID. Otherwise, look it up
// by table name.
shared_ptr<KuduTable>* kudu_table;
const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
const string& hms_table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
const string& hms_table_name = hms_table.parameters[HmsClient::kKuduTableNameKey];
if (storage_handler == HmsClient::kKuduStorageHandler &&
HmsClient::IsSynchronized(hms_table)) {
kudu_table = FindOrNull(kudu_tables_by_id, hms_table_id);
// If there is no kudu table that matches the id, or the id doesn't exist,
// lookup the table by name. This handles synchronized tables created when HMS
// sync was off or tables with IDs out of sync.
if (!kudu_table) {
kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name);
}
} else {
kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name);
}
if (kudu_table) {
if (HmsClient::IsSynchronized(hms_table)) {
synchronized_hms_tables_by_id[(*kudu_table)->id()].emplace_back(
std::move(hms_table));
} else if (hms_table.tableType == HmsClient::kExternalTable) {
external_hms_tables_by_id[(*kudu_table)->id()].emplace_back(
std::move(hms_table));
}
} else if (HmsClient::IsSynchronized(hms_table)) {
// Note: we only consider synchronized HMS table entries "orphans"
// because unsynchronized external tables don't always point at valid
// Kudu tables.
orphan_hms_tables.emplace_back(std::move(hms_table));
}
}
}
// Step 3: Determine the state of each Kudu table's HMS entry(ies), and bin
// them appropriately.
vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_tables;
vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_tables;
vector<pair<shared_ptr<KuduTable>, hive::Table>> stale_tables;
vector<shared_ptr<KuduTable>> missing_tables;
vector<shared_ptr<KuduTable>> invalid_name_tables;
for (auto& kudu_table_pair : kudu_tables_by_id) {
shared_ptr<KuduTable> kudu_table = kudu_table_pair.second;
// Check all of the synchronized HMS tables.
vector<hive::Table>* hms_tables = FindOrNull(synchronized_hms_tables_by_id,
kudu_table_pair.first);
// If the there are no synchronized HMS table entries, this table is missing
// HMS tables and might have an invalid table name.
if (!hms_tables) {
const string& table_name = kudu_table->name();
string normalized_table_name(table_name.data(), table_name.size());
Status s = hms::HmsCatalog::NormalizeTableName(&normalized_table_name);
if (!s.ok()) {
invalid_name_tables.emplace_back(std::move(kudu_table));
} else {
missing_tables.emplace_back(std::move(kudu_table));
}
// If there is a single synchronized HMS table, this table could be unsynced or
// using the legacy handler.
} else if (hms_tables->size() == 1) {
hive::Table& hms_table = (*hms_tables)[0];
const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
if (storage_handler == HmsClient::kKuduStorageHandler &&
!IsSynced(kudu_master_addrs, *kudu_table, hms_table)) {
stale_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
} else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
legacy_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
}
// Otherwise, there are multiple synchronized HMS tables for a single Kudu table.
} else {
for (hive::Table& hms_table : *hms_tables) {
duplicate_tables.emplace_back(make_pair(kudu_table, std::move(hms_table)));
}
}
}
// Check all of the external HMS tables to see if they are using the legacy handler.
for (auto& hms_table_pair : external_hms_tables_by_id) {
shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_id, hms_table_pair.first);
if (kudu_table) {
for (hive::Table &hms_table : hms_table_pair.second) {
const string &storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
legacy_tables.emplace_back(make_pair(std::move(*kudu_table), std::move(hms_table)));
}
}
}
}
report->orphan_hms_tables.swap(orphan_hms_tables);
report->missing_hms_tables.swap(missing_tables);
report->invalid_name_tables.swap(invalid_name_tables);
report->inconsistent_tables.swap(stale_tables);
report->legacy_hms_tables.swap(legacy_tables);
report->duplicate_hms_tables.swap(duplicate_tables);
return Status::OK();
}