in src/kudu/tools/tool_action_hms.cc [465:651]
Status FixHmsMetadata(const RunnerContext& context) {
shared_ptr<KuduClient> kudu_client;
unique_ptr<HmsCatalog> hms_catalog;
string master_addrs;
RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs));
CatalogReport report;
RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report));
bool success = true;
if (FLAGS_drop_orphan_hms_tables) {
for (hive::Table& hms_table : report.orphan_hms_tables) {
string table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
const string& master_addrs_param = hms_table.parameters[HmsClient::kKuduMasterAddrsKey];
if (master_addrs_param != master_addrs && !FLAGS_force) {
LOG(INFO) << "Skipping drop of orphan HMS table " << table_name
<< " with master addresses parameter " << master_addrs_param
<< " because it does not match the --" << kMasterAddressesArg << " argument"
<< " (use --force to skip this check)";
continue;
}
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Dropping orphan HMS table " << table_name;
} else {
const string& table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
// All errors are fatal here, since we've already checked that the table exists in the HMS.
if (storage_handler == HmsClient::kKuduStorageHandler) {
RETURN_NOT_OK_PREPEND(hms_catalog->DropTable(table_id, table_name),
Substitute("failed to drop orphan HMS table $0", table_name));
} else {
RETURN_NOT_OK_PREPEND(hms_catalog->DropLegacyTable(table_name),
Substitute("failed to drop legacy orphan HMS table $0", table_name));
}
}
}
}
if (FLAGS_create_missing_hms_tables) {
for (const auto& kudu_table : report.missing_hms_tables) {
const string& table_id = kudu_table->id();
const string& table_name = kudu_table->name();
Schema schema = client::SchemaFromKuduSchema(kudu_table->schema());
string normalized_table_name(table_name.data(), table_name.size());
CHECK_OK(hms::HmsCatalog::NormalizeTableName(&normalized_table_name));
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Creating HMS table for Kudu table " << TableIdent(*kudu_table);
} else {
Status s = hms_catalog->CreateTable(table_id, table_name, boost::none, schema);
if (s.IsAlreadyPresent()) {
LOG(ERROR) << "Failed to create HMS table for Kudu table "
<< TableIdent(*kudu_table)
<< " because another table already exists in the HMS with that name";
success = false;
continue;
}
if (s.IsInvalidArgument()) {
// This most likely means the database doesn't exist, but it is ambiguous.
LOG(ERROR) << "Failed to create HMS table for Kudu table "
<< TableIdent(*kudu_table)
<< " (database does not exist?): " << s.message().ToString();
success = false;
continue;
}
// All other errors are unexpected.
RETURN_NOT_OK_PREPEND(s,
Substitute("failed to create HMS table for Kudu table $0", TableIdent(*kudu_table)));
}
if (normalized_table_name != table_name) {
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(*kudu_table)
<< " to lowercased Hive-compatible name: " << normalized_table_name;
} else {
// All errors are fatal. We never expect to get an 'AlreadyPresent'
// error, since the catalog manager validates that no two
// Hive-compatible table names differ only by case.
//
// Note that if an error occurs we do not roll-back the HMS table
// creation step, since a subsequent run of the tool will recognize
// the table as an inconsistent table (Kudu and HMS table names do not
// match), and automatically fix it.
RETURN_NOT_OK_PREPEND(
RenameTableInKuduCatalog(kudu_client.get(), table_name, normalized_table_name),
Substitute("failed to rename Kudu table $0 to lowercased Hive compatible name $1",
TableIdent(*kudu_table), normalized_table_name));
}
}
}
}
if (FLAGS_upgrade_hms_tables) {
for (const auto& table_pair : report.legacy_hms_tables) {
const KuduTable& kudu_table = *table_pair.first;
const hive::Table& hms_table = table_pair.second;
string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Upgrading legacy Impala HMS metadata for table "
<< hms_table_name;
} else {
RETURN_NOT_OK_PREPEND(hms_catalog->UpgradeLegacyImpalaTable(
kudu_table.id(), hms_table.dbName, hms_table.tableName,
client::SchemaFromKuduSchema(kudu_table.schema())),
Substitute("failed to upgrade legacy Impala HMS metadata for table $0",
hms_table_name));
}
if (kudu_table.name() != hms_table_name) {
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table)
<< " to " << hms_table_name;
} else {
Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name);
if (s.IsAlreadyPresent()) {
LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table)
<< " to match the Hive Metastore name " << hms_table_name
<< ", because a Kudu table with name" << hms_table_name
<< " already exists";
LOG(INFO) << "Suggestion: rename the conflicting table name manually:\n"
<< "\t$ kudu table rename_table --modify_external_catalogs=false "
<< master_addrs << " " << hms_table_name << " <database-name>.<table-name>'";
success = false;
continue;
}
// All other errors are fatal. Note that if an error occurs we do not
// roll-back the HMS legacy upgrade step, since a subsequent run of
// the tool will recognize the table as an inconsistent table (Kudu
// and HMS table names do not match), and automatically fix it.
RETURN_NOT_OK_PREPEND(s,
Substitute("failed to rename Kudu table $0 to $1",
TableIdent(kudu_table), hms_table_name));
}
}
}
}
if (FLAGS_fix_inconsistent_tables) {
for (const auto& table_pair : report.inconsistent_tables) {
const KuduTable& kudu_table = *table_pair.first;
const hive::Table& hms_table = table_pair.second;
string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
if (hms_table_name != kudu_table.name()) {
// Update the Kudu table name to match the HMS table name.
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table)
<< " to " << hms_table_name;
} else {
Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name);
if (s.IsAlreadyPresent()) {
LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table)
<< " to match HMS table " << hms_table_name
<< ", because a Kudu table with name " << hms_table_name
<< " already exists";
success = false;
continue;
}
RETURN_NOT_OK_PREPEND(s,
Substitute("failed to rename Kudu table $0 to $1",
TableIdent(kudu_table), hms_table_name));
}
}
// Update the HMS table metadata to match Kudu.
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Refreshing HMS table metadata for Kudu table "
<< TableIdent(kudu_table);
} else {
Schema schema(client::SchemaFromKuduSchema(kudu_table.schema()));
RETURN_NOT_OK_PREPEND(
hms_catalog->AlterTable(kudu_table.id(), hms_table_name, hms_table_name, schema),
Substitute("failed to refresh HMS table metadata for Kudu table $0",
TableIdent(kudu_table)));
}
}
}
if (FLAGS_dryrun || success) {
return Status::OK();
}
return Status::RuntimeError("Failed to fix some catalog metadata inconsistencies");
}