in src/kudu/master/catalog_manager.cc [3547:4005]
Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
AlterTableResponsePB* resp,
optional<int64_t> hms_notification_log_event_id,
const optional<string>& user) {
leader_lock_.AssertAcquiredForReading();
// 1. Group the steps into schema altering steps and partition altering steps.
vector<AlterTableRequestPB::Step> alter_schema_steps;
vector<AlterTableRequestPB::Step> alter_partitioning_steps;
for (const auto& step : req.alter_schema_steps()) {
switch (step.type()) {
case AlterTableRequestPB::ADD_COLUMN:
case AlterTableRequestPB::DROP_COLUMN:
case AlterTableRequestPB::RENAME_COLUMN:
case AlterTableRequestPB::ALTER_COLUMN: {
alter_schema_steps.emplace_back(step);
break;
}
case AlterTableRequestPB::ADD_RANGE_PARTITION:
case AlterTableRequestPB::DROP_RANGE_PARTITION: {
alter_partitioning_steps.emplace_back(step);
break;
}
case AlterTableRequestPB::UNKNOWN: {
return Status::InvalidArgument("Invalid alter step type", SecureShortDebugString(step));
}
}
}
// Pre-check the modifications' validity:
// Alterations done by admin should not be combined with other table alterations.
bool table_limit_change = req.has_disk_size_limit() ||
req.has_row_count_limit();
bool other_schema_change = req.has_new_table_name() ||
req.has_new_table_owner() ||
!req.new_extra_configs().empty() ||
!alter_schema_steps.empty() ||
!alter_partitioning_steps.empty();
if (table_limit_change && !FLAGS_enable_table_write_limit) {
return SetupError(Status::NotSupported(
"altering table limit is not supported because "
"--enable_table_write_limit is not enabled"),
resp, MasterErrorPB::UNKNOWN_ERROR);
}
if (table_limit_change && other_schema_change) {
return SetupError(Status::ConfigurationError(
"altering table limit cannot be combined with other alterations"),
resp, MasterErrorPB::UNKNOWN_ERROR);
}
// 2. Lookup the table, verify if it exists, lock it for modification, and then
// checks that the user is authorized to operate on the table.
scoped_refptr<TableInfo> table;
TableMetadataLock l;
auto authz_func = [&] (const string& username,
const string& table_name,
const string& owner) {
const string new_table = req.has_new_table_name() ?
NormalizeTableName(req.new_table_name()) : table_name;
// Change owner requires higher level of privilege (ALL WITH GRANT OPTION,
// or ALL + delegate admin) than other types of alter operations, so if a
// single alter contains an owner change as well as other changes, it's
// sufficient to authorize only the owner change.
if (req.has_new_table_owner()) {
return SetupError(authz_provider_->AuthorizeChangeOwner(table_name, username,
username == owner),
resp, MasterErrorPB::NOT_AUTHORIZED);
}
if (req.has_disk_size_limit() || req.has_row_count_limit()) {
// Table limit is used to stop writing from the table owner,
// so, the owner is disallowed to change the table limit.
if (user && !master_->IsServiceUserOrSuperUser(*user)) {
return SetupError(
Status::NotAuthorized("must be a service user or "
"a super user to modify table limit"),
resp, MasterErrorPB::NOT_AUTHORIZED);
}
}
return SetupError(authz_provider_->AuthorizeAlterTable(table_name, new_table, username,
username == owner),
resp, MasterErrorPB::NOT_AUTHORIZED);
};
RETURN_NOT_OK(FindLockAndAuthorizeTable(
req, resp, LockMode::WRITE, authz_func, user, &table, &l));
if (l.data().is_deleted()) {
return SetupError(
Status::NotFound("the table was deleted", l.data().pb.state_msg()),
resp, MasterErrorPB::TABLE_NOT_FOUND);
}
l.mutable_data()->pb.set_alter_timestamp(time(nullptr));
string normalized_table_name = NormalizeTableName(l.data().name());
*resp->mutable_table_id() = table->id();
// Set the table limit.
if (table_limit_change) {
if (req.has_disk_size_limit()) {
if (req.disk_size_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
l.mutable_data()->pb.clear_table_disk_size_limit();
LOG(INFO) << Substitute("Resetting table $0 disk_size_limit to the default setting",
normalized_table_name);
} else if (req.disk_size_limit() >= 0) {
l.mutable_data()->pb.set_table_disk_size_limit(req.disk_size_limit());
LOG(INFO) << Substitute("Setting table $0 disk_size_limit to $1",
normalized_table_name, req.disk_size_limit());
} else {
return SetupError(Status::InvalidArgument("disk size limit must "
"be greater than or equal to -1"),
resp, MasterErrorPB::UNKNOWN_ERROR);
}
}
if (req.has_row_count_limit()) {
if (req.row_count_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
l.mutable_data()->pb.clear_table_row_count_limit();
LOG(INFO) << Substitute("Resetting table $0 row_count_limit to the default setting",
normalized_table_name);
} else if (req.row_count_limit() >= 0) {
l.mutable_data()->pb.set_table_row_count_limit(req.row_count_limit());
LOG(INFO) << Substitute("Setting table $0 row_count_limit to $1",
normalized_table_name, req.row_count_limit());
} else {
return SetupError(Status::InvalidArgument("row count limit must "
"be greater than or equal to -1"),
resp, MasterErrorPB::UNKNOWN_ERROR);
}
}
}
// 3. Calculate and validate new schema for the on-disk state, not persisted yet.
Schema new_schema;
ColumnId next_col_id = ColumnId(l.data().pb.next_column_id());
// Apply the alter steps. Note that there may be no steps, in which case this
// is essentialy a no-op. It's still important to execute because
// ApplyAlterSchemaSteps populates 'new_schema', which is used below.
TRACE("Apply alter schema");
// KUDU-3577: 'needs_range_bounds_refresh' reflects whether it's necessary
// to re-encode information on partition boundaries for ranges with custom
// hash schemas in the system catalog
bool needs_range_bounds_refresh = false;
RETURN_NOT_OK(SetupError(
ApplyAlterSchemaSteps(l.data().pb,
alter_schema_steps,
&new_schema,
&next_col_id,
&needs_range_bounds_refresh),
resp, MasterErrorPB::INVALID_SCHEMA));
DCHECK_NE(next_col_id, 0);
DCHECK_EQ(new_schema.find_column_by_id(next_col_id),
static_cast<int>(Schema::kColumnNotFound));
// Just validate the schema, not the name, owner, or comment (validated below).
RETURN_NOT_OK(SetupError(
ValidateClientSchema(nullopt, nullopt, nullopt, new_schema),
resp, MasterErrorPB::INVALID_SCHEMA));
// 4. Validate and try to acquire the new table name.
string normalized_new_table_name = NormalizeTableName(req.new_table_name());
if (req.has_new_table_name()) {
// Validate the new table name.
RETURN_NOT_OK(SetupError(
ValidateIdentifier(req.new_table_name()).CloneAndPrepend("invalid table name"),
resp, MasterErrorPB::INVALID_SCHEMA));
std::lock_guard catalog_lock(lock_);
TRACE("Acquired catalog manager lock");
// Verify that a table does not already exist with the new name. This
// also disallows no-op renames (ALTER TABLE a RENAME TO a).
//
// Special case: if this is a rename of a table from a non-normalized to
// normalized name (ALTER TABLE A RENAME to a), then allow it.
scoped_refptr<TableInfo> other_table = FindPtrOrNull(normalized_table_names_map_,
normalized_new_table_name);
if (other_table &&
!(table.get() == other_table.get() && l.data().name() != normalized_new_table_name)) {
return SetupError(
Status::AlreadyPresent(Substitute("table $0 already exists with id $1",
normalized_new_table_name, other_table->id())),
resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
}
// Reserve the new table name if possible.
if (!InsertIfNotPresent(&reserved_normalized_table_names_, normalized_new_table_name)) {
// ServiceUnavailable will cause the client to retry the create table
// request. We don't want to outright fail the request with
// 'AlreadyPresent', because a table name reservation can be rolled back
// in the case of an error. Instead, we force the client to retry at a
// later time.
return SetupError(Status::ServiceUnavailable(Substitute(
"table name $0 is already reserved", normalized_new_table_name)),
resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
}
l.mutable_data()->pb.set_name(normalized_new_table_name);
}
// Ensure that we drop our reservation upon return.
SCOPED_CLEANUP({
if (req.has_new_table_name()) {
std::lock_guard l(lock_);
CHECK_EQ(1, reserved_normalized_table_names_.erase(normalized_new_table_name));
}
});
// 5. Alter the table owner.
if (req.has_new_table_owner()) {
RETURN_NOT_OK(SetupError(
ValidateOwner(req.new_table_owner()).CloneAndAppend("invalid owner name"),
resp, MasterErrorPB::INVALID_SCHEMA));
l.mutable_data()->pb.set_owner(req.new_table_owner());
}
// 6. Alter the table comment.
if (req.has_new_table_comment()) {
RETURN_NOT_OK(SetupError(
ValidateTableComment(req.new_table_comment()).CloneAndPrepend("invalid table comment"),
resp, MasterErrorPB::INVALID_SCHEMA));
l.mutable_data()->pb.set_comment(req.new_table_comment());
}
// 7. Alter table partitioning.
vector<scoped_refptr<TabletInfo>> tablets_to_add;
vector<scoped_refptr<TabletInfo>> tablets_to_drop;
bool partition_schema_updated = false;
if (!alter_partitioning_steps.empty()) {
TRACE("Apply alter partitioning");
Schema client_schema;
RETURN_NOT_OK(SetupError(SchemaFromPB(req.schema(), &client_schema),
resp, MasterErrorPB::UNKNOWN_ERROR));
RETURN_NOT_OK(SetupError(ApplyAlterPartitioningSteps(
table, client_schema, alter_partitioning_steps, &l,
&tablets_to_add, &tablets_to_drop, &partition_schema_updated),
resp, MasterErrorPB::UNKNOWN_ERROR));
}
// 8. Alter table's replication factor.
bool num_replicas_changed = false;
if (req.has_num_replicas()) {
int num_replicas = req.num_replicas();
RETURN_NOT_OK(ValidateNumberReplicas(normalized_table_name,
resp, ValidateType::kAlterTable,
nullopt, num_replicas));
if (num_replicas != l.data().pb.num_replicas()) {
num_replicas_changed = true;
l.mutable_data()->pb.set_num_replicas(num_replicas);
}
}
// 9. Alter table's extra configuration properties.
if (!req.new_extra_configs().empty()) {
TRACE("Apply alter extra-config");
Map<string, string> new_extra_configs;
RETURN_NOT_OK(ExtraConfigPBToPBMap(l.data().pb.extra_config(),
&new_extra_configs));
// Merge table's extra configuration properties.
for (const auto& config : req.new_extra_configs()) {
new_extra_configs[config.first] = config.second;
}
RETURN_NOT_OK(ExtraConfigPBFromPBMap(new_extra_configs,
l.mutable_data()->pb.mutable_extra_config()));
}
// Set to true if columns are altered, added or dropped.
const bool has_schema_changes = !alter_schema_steps.empty();
// Set to true if there are schema changes, the table is renamed,
// or if any other table properties changed.
const bool has_metadata_changes = has_schema_changes ||
req.has_new_table_name() || req.has_new_table_owner() ||
!req.new_extra_configs().empty() || req.has_disk_size_limit() ||
req.has_row_count_limit() || req.has_new_table_comment() ||
num_replicas_changed;
// Set to true if there are partitioning changes.
const bool has_partitioning_changes = !alter_partitioning_steps.empty() ||
partition_schema_updated;
// Set to true if metadata changes need to be applied to existing tablets.
const bool has_metadata_changes_for_existing_tablets =
has_metadata_changes && table->num_tablets() > tablets_to_drop.size();
// Skip empty requests...
if (!has_metadata_changes && !has_partitioning_changes) {
return Status::OK();
}
// 10. Serialize the schema and increment the version number.
if (needs_range_bounds_refresh) {
// KUDU-3577: if necessary, re-encode the information on partition
// boundaries for ranges with custom hash schemas
Schema cur_schema; // the current table schema before being altered
RETURN_NOT_OK(SchemaFromPB(l.data().pb.schema(), &cur_schema));
PartitionSchema partition_schema;
RETURN_NOT_OK(PartitionSchema::FromPB(
l.data().pb.partition_schema(), cur_schema, &partition_schema));
RETURN_NOT_OK(partition_schema.ToPB(
new_schema, l.mutable_data()->pb.mutable_partition_schema()));
}
if (has_metadata_changes_for_existing_tablets && !l.data().pb.has_fully_applied_schema()) {
l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
}
if (has_schema_changes) {
CHECK_OK(SchemaToPB(new_schema, l.mutable_data()->pb.mutable_schema()));
}
if (has_metadata_changes) {
l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
l.mutable_data()->pb.set_next_column_id(next_col_id);
}
if (!tablets_to_add.empty() || has_metadata_changes_for_existing_tablets) {
// If some tablet schemas need to be updated or there are any new tablets,
// set the table state to ALTERING, so that IsAlterTableDone RPCs will wait
// for the schema updates and tablets to be running.
l.mutable_data()->set_state(SysTablesEntryPB::ALTERING,
Substitute("Alter Table version=$0 ts=$1",
l.mutable_data()->pb.version(),
LocalTimeAsString()));
}
const time_t timestamp = time(nullptr);
const string deletion_msg = "Partition dropped at " + TimestampAsString(timestamp);
TabletMetadataGroupLock tablets_to_add_lock(LockMode::WRITE);
TabletMetadataGroupLock tablets_to_drop_lock(LockMode::RELEASED);
// 11. Update sys-catalog with the new table schema and tablets to add/drop.
TRACE("Updating metadata on disk");
{
SysCatalogTable::Actions actions;
actions.hms_notification_log_event_id =
std::move(hms_notification_log_event_id);
if (!tablets_to_add.empty() || has_metadata_changes) {
// If anything modified the table's persistent metadata, then sync it to the sys catalog.
actions.table_to_update = table;
}
actions.tablets_to_add = tablets_to_add;
tablets_to_add_lock.AddMutableInfos(tablets_to_add);
tablets_to_drop_lock.AddMutableInfos(tablets_to_drop);
tablets_to_drop_lock.Lock(LockMode::WRITE);
for (auto& tablet : tablets_to_drop) {
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::DELETED, deletion_msg);
tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);
}
actions.tablets_to_update = tablets_to_drop;
Status s = sys_catalog_->Write(std::move(actions));
if (PREDICT_FALSE(!s.ok())) {
s = s.CloneAndPrepend("an error occurred while updating the sys-catalog");
LOG(WARNING) << s.ToString();
CheckIfNoLongerLeaderAndSetupError(s, resp);
return s;
}
}
// 12. Commit the in-memory state.
TRACE("Committing alterations to in-memory state");
{
// Commit new tablet in-memory state. This doesn't require taking the global
// lock since the new tablets are not yet visible, because they haven't been
// added to the table or tablet index.
tablets_to_add_lock.Commit();
// Take the global catalog manager lock in order to modify the global table
// and tablets indices.
std::lock_guard lock(lock_);
if (req.has_new_table_name()) {
if (normalized_table_names_map_.erase(normalized_table_name) != 1) {
LOG(FATAL) << "Could not remove table " << table->ToString()
<< " from map in response to AlterTable request: "
<< SecureShortDebugString(req);
}
InsertOrDie(&normalized_table_names_map_, normalized_new_table_name, table);
// Alter the table name in the attributes of the metrics.
table->UpdateMetricsAttrs(normalized_new_table_name);
}
// Insert new tablets into the global tablet map. After this, the tablets
// will be visible in GetTabletLocations RPCs.
for (const auto& tablet : tablets_to_add) {
InsertOrDie(&tablet_map_, tablet->id(), tablet);
}
}
// Add and remove new tablets from the table. This makes the tablets visible
// to GetTableLocations RPCs. This doesn't need to happen under the global
// lock, since:
// * clients can not know the new tablet IDs, so GetTabletLocations RPCs
// are impossible.
// * the new tablets can not heartbeat yet, since they don't get created
// until further down.
//
// We acquire new READ locks for tablets_to_add because we've already
// committed our WRITE locks above, and reordering the operations such that
// the WRITE locks could be reused would open a short window wherein
// uninitialized tablet state is published to the world.
for (const auto& tablet : tablets_to_add) {
tablet->metadata().ReadLock();
}
table->AddRemoveTablets(tablets_to_add, tablets_to_drop);
for (const auto& tablet : tablets_to_add) {
tablet->metadata().ReadUnlock();
}
// Commit state change for dropped tablets. This comes after removing the
// tablets from their associated tables so that if a GetTableLocations or
// GetTabletLocations returns a deleted tablet, the retry will never include
// the tablet again.
tablets_to_drop_lock.Commit();
// If there are schema changes or the owner or comment changed, then update the
// entry in the Hive Metastore. This is done on a best-effort basis, since Kudu
// is the source of truth for table schema information, and the table has already
// been altered in the Kudu catalog via the successful sys-table write above.
if (hms_catalog_ && (has_schema_changes ||
req.has_new_table_owner() || req.has_new_table_comment())) {
// Sanity check: if there are schema changes then this is necessarily not a
// table rename, since we split out the rename portion into its own
// 'transaction' which is serialized through the HMS.
DCHECK(!req.has_new_table_name());
auto s = hms_catalog_->AlterTable(
table->id(), normalized_table_name, normalized_table_name,
GetClusterId(), l.mutable_data()->owner(), new_schema, l.mutable_data()->comment());
if (PREDICT_TRUE(s.ok())) {
LOG(INFO) << Substitute(
"altered HMS schema for table $0", table->ToString());
} else {
LOG(WARNING) << Substitute(
"failed to alter HMS schema for table $0, "
"HMS schema information will be stale: $1",
table->ToString(), s.ToString());
}
}
if (!tablets_to_add.empty() || has_metadata_changes || partition_schema_updated) {
l.Commit();
} else {
l.Unlock();
}
SendAlterTableRequest(table);
for (const auto& tablet : tablets_to_drop) {
TabletMetadataLock l(tablet.get(), LockMode::READ);
SendDeleteTabletRequest(tablet, l, deletion_msg);
}
// 13. Invalidate/purge corresponding entries in the table locations cache.
if (table_locations_cache_ &&
(!tablets_to_add.empty() || !tablets_to_drop.empty())) {
table_locations_cache_->Remove(table->id());
}
// Update table's schema related metrics after being altered.
table->UpdateSchemaMetrics();
background_tasks_->Wake();
return Status::OK();
}