in src/kudu/tools/tool_action_hms.cc [631:877]
Status FixHmsMetadata(const RunnerContext& context) {
shared_ptr<KuduClient> kudu_client;
unique_ptr<HmsCatalog> hms_catalog;
string master_addrs;
RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs));
CatalogReport report;
int kudu_catalog_count = 0;
int hms_catalog_count = 0;
RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report,
&kudu_catalog_count, &hms_catalog_count));
if (FLAGS_dryrun && kudu_catalog_count == 0) {
LOG(INFO) << "NOTE: There are zero kudu tables listed. If the cluster indeed has kudu tables "
"please re-run the command with right credentials." << endl;
}
bool success = true;
if (FLAGS_drop_orphan_hms_tables) {
for (hive::Table& hms_table : report.orphan_hms_tables) {
string table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
const string& master_addrs_param = hms_table.parameters[HmsClient::kKuduMasterAddrsKey];
// Normalize the master addresses to allow for an equality check that ignores
// missing default ports, duplicate addresses, and address order.
UnorderedHostPortSet param_set;
RETURN_NOT_OK(MasterAddressesToSet(master_addrs_param, ¶m_set));
UnorderedHostPortSet cluster_set;
RETURN_NOT_OK(MasterAddressesToSet(master_addrs, &cluster_set));
if (param_set != cluster_set && !FLAGS_force) {
LOG(INFO) << "Skipping drop of orphan HMS table " << table_name
<< " with master addresses parameter " << master_addrs_param
<< " because it does not match the --" << kMasterAddressesArg << " argument"
<< " (use --force to skip this check)";
continue;
}
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Dropping orphan HMS table " << table_name;
} else {
const string& table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
// All errors are fatal here, since we've already checked that the table exists in the HMS.
if (storage_handler == HmsClient::kKuduStorageHandler) {
RETURN_NOT_OK_PREPEND(hms_catalog->DropTable(table_id, table_name),
Substitute("failed to drop orphan HMS table $0", table_name));
} else {
RETURN_NOT_OK_PREPEND(hms_catalog->DropLegacyTable(table_name),
Substitute("failed to drop legacy orphan HMS table $0", table_name));
}
}
}
}
if (FLAGS_create_missing_hms_tables) {
for (const auto& kudu_table : report.missing_hms_tables) {
const string& table_id = kudu_table->id();
const string& cluster_id = kudu_table->client()->cluster_id();
const string& table_name = kudu_table->name();
auto schema = KuduSchema::ToSchema(kudu_table->schema());
string normalized_table_name(table_name.data(), table_name.size());
CHECK_OK(hms::HmsCatalog::NormalizeTableName(&normalized_table_name));
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Creating HMS table for Kudu table " << TableIdent(*kudu_table);
} else {
Status s = hms_catalog->CreateTable(table_id, table_name, cluster_id,
kudu_table->owner(), schema, kudu_table->comment());
if (s.IsAlreadyPresent()) {
LOG(ERROR) << "Failed to create HMS table for Kudu table "
<< TableIdent(*kudu_table)
<< " because another table already exists in the HMS with that name";
success = false;
continue;
}
if (s.IsInvalidArgument()) {
// This most likely means the database doesn't exist, but it is ambiguous.
LOG(ERROR) << "Failed to create HMS table for Kudu table "
<< TableIdent(*kudu_table)
<< " (database does not exist?): " << s.message().ToString();
success = false;
continue;
}
// All other errors are unexpected.
RETURN_NOT_OK_PREPEND(s,
Substitute("failed to create HMS table for Kudu table $0", TableIdent(*kudu_table)));
}
if (normalized_table_name != table_name) {
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(*kudu_table)
<< " to lowercased Hive-compatible name: " << normalized_table_name;
} else {
// All errors are fatal. We never expect to get an 'AlreadyPresent'
// error, since the catalog manager validates that no two
// Hive-compatible table names differ only by case.
//
// Note that if an error occurs we do not roll-back the HMS table
// creation step, since a subsequent run of the tool will recognize
// the table as an inconsistent table (Kudu and HMS table names do not
// match), and automatically fix it.
RETURN_NOT_OK_PREPEND(
RenameTableInKuduCatalog(kudu_client.get(), table_name, normalized_table_name),
Substitute("failed to rename Kudu table $0 to lowercased Hive compatible name $1",
TableIdent(*kudu_table), normalized_table_name));
}
}
}
}
if (FLAGS_upgrade_hms_tables) {
for (const auto& table_pair : report.legacy_hms_tables) {
const KuduTable& kudu_table = *table_pair.first;
const hive::Table& hms_table = table_pair.second;
string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Upgrading legacy Impala HMS metadata for table "
<< hms_table_name;
} else {
RETURN_NOT_OK_PREPEND(hms_catalog->UpgradeLegacyImpalaTable(
kudu_table.id(), kudu_table.client()->cluster_id(), hms_table.dbName,
hms_table.tableName, KuduSchema::ToSchema(kudu_table.schema()),
kudu_table.comment()),
Substitute("failed to upgrade legacy Impala HMS metadata for table $0",
hms_table_name));
}
if (HmsClient::IsSynchronized(hms_table) && kudu_table.name() != hms_table_name) {
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table)
<< " to " << hms_table_name;
} else {
Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name);
if (s.IsAlreadyPresent()) {
LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table)
<< " to match the Hive Metastore name " << hms_table_name
<< ", because a Kudu table with name" << hms_table_name
<< " already exists";
LOG(INFO) << "Suggestion: rename the conflicting table name manually:\n"
<< "\t$ kudu table rename_table --modify_external_catalogs=false "
<< master_addrs << " " << hms_table_name << " <database-name>.<table-name>'";
success = false;
continue;
}
// All other errors are fatal. Note that if an error occurs we do not
// roll-back the HMS legacy upgrade step, since a subsequent run of
// the tool will recognize the table as an inconsistent table (Kudu
// and HMS table names do not match), and automatically fix it.
RETURN_NOT_OK_PREPEND(s,
Substitute("failed to rename Kudu table $0 to $1",
TableIdent(kudu_table), hms_table_name));
}
}
}
}
if (FLAGS_fix_inconsistent_tables) {
for (const auto& table_pair : report.inconsistent_tables) {
const KuduTable& kudu_table = *table_pair.first;
const hive::Table& hms_table = table_pair.second;
string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
string owner = kudu_table.owner();
string comment = kudu_table.comment();
if (hms_table_name != kudu_table.name()) {
// Update the Kudu table name to match the HMS table name.
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table)
<< " to " << hms_table_name;
} else {
Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name);
if (s.IsAlreadyPresent()) {
LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table)
<< " to match HMS table " << hms_table_name
<< ", because a Kudu table with name " << hms_table_name
<< " already exists";
success = false;
continue;
}
RETURN_NOT_OK_PREPEND(s,
Substitute("failed to rename Kudu table $0 to $1",
TableIdent(kudu_table), hms_table_name));
}
}
// If the HMS table has an owner and Kudu does not, update the Kudu table owner to match
// the HMS table owner. Otherwise the metadata step below will ensure the Kudu owner
// is updated in the HMS.
if (hms_table.owner != owner && owner.empty()) {
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Changing owner of " << TableIdent(kudu_table)
<< " to " << hms_table.owner << " in Kudu catalog.";
} else {
RETURN_NOT_OK_PREPEND(
ChangeOwnerInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table.owner),
Substitute("failed to change owner of $0 to $1 in Kudu catalog",
TableIdent(kudu_table), hms_table.owner));
owner = hms_table.owner;
}
}
// If the HMS table has a table comment and Kudu does not, update the Kudu table comment
// to match the HMS table comment. Otherwise the metadata step below will ensure the Kudu
// comment is updated in the HMS.
static const string kDefaultValue = "";
const string& hms_table_comment = FindWithDefault(
hms_table.parameters, hms::HmsClient::kTableCommentKey, kDefaultValue);
if (hms_table_comment != comment && comment.empty()) {
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Changing table comment for " << TableIdent(kudu_table)
<< " to " << hms_table_comment << " in Kudu catalog.";
} else {
RETURN_NOT_OK_PREPEND(ChangeTableCommentInKuduCatalog(kudu_client.get(),
kudu_table.name(), hms_table_comment),
Substitute("failed to change table comment of $0 to $1 in Kudu catalog",
TableIdent(kudu_table), hms_table_comment));
comment = hms_table_comment;
}
}
// Update the HMS table metadata to match Kudu.
if (FLAGS_dryrun) {
LOG(INFO) << "[dryrun] Refreshing HMS table metadata for Kudu table "
<< TableIdent(kudu_table);
} else {
auto schema = KuduSchema::ToSchema(kudu_table.schema());
RETURN_NOT_OK_PREPEND(
// Disable table ID checking to support fixing tables with unsynchronized IDs.
hms_catalog->AlterTable(kudu_table.id(), hms_table_name, hms_table_name,
kudu_table.client()->cluster_id(), owner, schema,
comment, /* check_id */ false),
Substitute("failed to refresh HMS table metadata for Kudu table $0",
TableIdent(kudu_table)));
}
}
}
LOG(INFO) << Substitute("Number of Kudu tables found in Kudu master catalog: $0",
kudu_catalog_count) << endl;
LOG(INFO) << Substitute("Number of Kudu tables found in HMS catalog: $0", hms_catalog_count)
<< endl;
if (FLAGS_dryrun || success) {
return Status::OK();
}
return Status::RuntimeError("Failed to fix some catalog metadata inconsistencies");
}