Status CatalogManager::AlterTable()

in src/kudu/master/catalog_manager.cc [3547:4005]


Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
                                  AlterTableResponsePB* resp,
                                  optional<int64_t> hms_notification_log_event_id,
                                  const optional<string>& user) {
  leader_lock_.AssertAcquiredForReading();

  // 1. Group the steps into schema altering steps and partition altering steps.
  vector<AlterTableRequestPB::Step> alter_schema_steps;
  vector<AlterTableRequestPB::Step> alter_partitioning_steps;
  for (const auto& step : req.alter_schema_steps()) {
    switch (step.type()) {
      case AlterTableRequestPB::ADD_COLUMN:
      case AlterTableRequestPB::DROP_COLUMN:
      case AlterTableRequestPB::RENAME_COLUMN:
      case AlterTableRequestPB::ALTER_COLUMN: {
        alter_schema_steps.emplace_back(step);
        break;
      }
      case AlterTableRequestPB::ADD_RANGE_PARTITION:
      case AlterTableRequestPB::DROP_RANGE_PARTITION: {
        alter_partitioning_steps.emplace_back(step);
        break;
      }
      case AlterTableRequestPB::UNKNOWN: {
        return Status::InvalidArgument("Invalid alter step type", SecureShortDebugString(step));
      }
    }
  }

  // Pre-check the modifications' validity:
  // Alterations done by admin should not be combined with other table alterations.
  bool table_limit_change = req.has_disk_size_limit() ||
                            req.has_row_count_limit();
  bool other_schema_change = req.has_new_table_name() ||
                             req.has_new_table_owner() ||
                             !req.new_extra_configs().empty() ||
                             !alter_schema_steps.empty() ||
                             !alter_partitioning_steps.empty();
  if (table_limit_change && !FLAGS_enable_table_write_limit) {
    return SetupError(Status::NotSupported(
                      "altering table limit is not supported because "
                      "--enable_table_write_limit is not enabled"),
                      resp, MasterErrorPB::UNKNOWN_ERROR);
  }
  if (table_limit_change && other_schema_change) {
    return SetupError(Status::ConfigurationError(
                      "altering table limit cannot be combined with other alterations"),
                      resp, MasterErrorPB::UNKNOWN_ERROR);
  }

  // 2. Lookup the table, verify if it exists, lock it for modification, and then
  //    checks that the user is authorized to operate on the table.
  scoped_refptr<TableInfo> table;
  TableMetadataLock l;
  auto authz_func = [&] (const string& username,
                         const string& table_name,
                         const string& owner) {
    const string new_table = req.has_new_table_name() ?
        NormalizeTableName(req.new_table_name()) : table_name;
    // Change owner requires higher level of privilege (ALL WITH GRANT OPTION,
    // or ALL + delegate admin) than other types of alter operations, so if a
    // single alter contains an owner change as well as other changes, it's
    // sufficient to authorize only the owner change.
    if (req.has_new_table_owner()) {
      return SetupError(authz_provider_->AuthorizeChangeOwner(table_name, username,
                                                              username == owner),
                        resp, MasterErrorPB::NOT_AUTHORIZED);
    }
    if (req.has_disk_size_limit() || req.has_row_count_limit()) {
      // Table limit is used to stop writing from the table owner,
      // so, the owner is disallowed to change the table limit.
      if (user && !master_->IsServiceUserOrSuperUser(*user)) {
        return SetupError(
              Status::NotAuthorized("must be a service user or "
              "a super user to modify table limit"),
              resp, MasterErrorPB::NOT_AUTHORIZED);
      }
    }
    return SetupError(authz_provider_->AuthorizeAlterTable(table_name, new_table, username,
                                                           username == owner),
                      resp, MasterErrorPB::NOT_AUTHORIZED);
  };
  RETURN_NOT_OK(FindLockAndAuthorizeTable(
      req, resp, LockMode::WRITE, authz_func, user, &table, &l));
  if (l.data().is_deleted()) {
    return SetupError(
        Status::NotFound("the table was deleted", l.data().pb.state_msg()),
        resp, MasterErrorPB::TABLE_NOT_FOUND);
  }
  l.mutable_data()->pb.set_alter_timestamp(time(nullptr));

  string normalized_table_name = NormalizeTableName(l.data().name());
  *resp->mutable_table_id() = table->id();

  // Set the table limit.
  if (table_limit_change) {
    if (req.has_disk_size_limit()) {
      if (req.disk_size_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
        l.mutable_data()->pb.clear_table_disk_size_limit();
        LOG(INFO) << Substitute("Resetting table $0 disk_size_limit to the default setting",
                                normalized_table_name);
      } else if (req.disk_size_limit() >= 0) {
        l.mutable_data()->pb.set_table_disk_size_limit(req.disk_size_limit());
        LOG(INFO) << Substitute("Setting table $0 disk_size_limit to $1",
                                 normalized_table_name, req.disk_size_limit());
      } else {
        return SetupError(Status::InvalidArgument("disk size limit must "
            "be greater than or equal to -1"),
            resp, MasterErrorPB::UNKNOWN_ERROR);
      }
    }
    if (req.has_row_count_limit()) {
      if (req.row_count_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
        l.mutable_data()->pb.clear_table_row_count_limit();
        LOG(INFO) << Substitute("Resetting table $0 row_count_limit to the default setting",
                                normalized_table_name);
      } else if (req.row_count_limit() >= 0) {
        l.mutable_data()->pb.set_table_row_count_limit(req.row_count_limit());
        LOG(INFO) << Substitute("Setting table $0 row_count_limit to $1",
                                 normalized_table_name, req.row_count_limit());
      } else {
        return SetupError(Status::InvalidArgument("row count limit must "
            "be greater than or equal to -1"),
            resp, MasterErrorPB::UNKNOWN_ERROR);
      }
    }
  }

  // 3. Calculate and validate new schema for the on-disk state, not persisted yet.
  Schema new_schema;
  ColumnId next_col_id = ColumnId(l.data().pb.next_column_id());

  // Apply the alter steps. Note that there may be no steps, in which case this
  // is essentialy a no-op. It's still important to execute because
  // ApplyAlterSchemaSteps populates 'new_schema', which is used below.
  TRACE("Apply alter schema");

  // KUDU-3577: 'needs_range_bounds_refresh' reflects whether it's necessary
  // to re-encode information on partition boundaries for ranges with custom
  // hash schemas in the system catalog
  bool needs_range_bounds_refresh = false;
  RETURN_NOT_OK(SetupError(
        ApplyAlterSchemaSteps(l.data().pb,
                              alter_schema_steps,
                              &new_schema,
                              &next_col_id,
                              &needs_range_bounds_refresh),
        resp, MasterErrorPB::INVALID_SCHEMA));

  DCHECK_NE(next_col_id, 0);
  DCHECK_EQ(new_schema.find_column_by_id(next_col_id),
            static_cast<int>(Schema::kColumnNotFound));

  // Just validate the schema, not the name, owner, or comment (validated below).
  RETURN_NOT_OK(SetupError(
        ValidateClientSchema(nullopt, nullopt, nullopt, new_schema),
        resp, MasterErrorPB::INVALID_SCHEMA));

  // 4. Validate and try to acquire the new table name.
  string normalized_new_table_name = NormalizeTableName(req.new_table_name());
  if (req.has_new_table_name()) {
    // Validate the new table name.
    RETURN_NOT_OK(SetupError(
          ValidateIdentifier(req.new_table_name()).CloneAndPrepend("invalid table name"),
          resp, MasterErrorPB::INVALID_SCHEMA));

    std::lock_guard catalog_lock(lock_);
    TRACE("Acquired catalog manager lock");

    // Verify that a table does not already exist with the new name. This
    // also disallows no-op renames (ALTER TABLE a RENAME TO a).
    //
    // Special case: if this is a rename of a table from a non-normalized to
    // normalized name (ALTER TABLE A RENAME to a), then allow it.
    scoped_refptr<TableInfo> other_table = FindPtrOrNull(normalized_table_names_map_,
                                                         normalized_new_table_name);
    if (other_table &&
        !(table.get() == other_table.get() && l.data().name() != normalized_new_table_name)) {
      return SetupError(
          Status::AlreadyPresent(Substitute("table $0 already exists with id $1",
              normalized_new_table_name, other_table->id())),
          resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
    }

    // Reserve the new table name if possible.
    if (!InsertIfNotPresent(&reserved_normalized_table_names_, normalized_new_table_name)) {
      // ServiceUnavailable will cause the client to retry the create table
      // request. We don't want to outright fail the request with
      // 'AlreadyPresent', because a table name reservation can be rolled back
      // in the case of an error. Instead, we force the client to retry at a
      // later time.
      return SetupError(Status::ServiceUnavailable(Substitute(
              "table name $0 is already reserved", normalized_new_table_name)),
          resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
    }

    l.mutable_data()->pb.set_name(normalized_new_table_name);
  }

  // Ensure that we drop our reservation upon return.
  SCOPED_CLEANUP({
    if (req.has_new_table_name()) {
      std::lock_guard l(lock_);
      CHECK_EQ(1, reserved_normalized_table_names_.erase(normalized_new_table_name));
    }
  });

  // 5. Alter the table owner.
  if (req.has_new_table_owner()) {
    RETURN_NOT_OK(SetupError(
          ValidateOwner(req.new_table_owner()).CloneAndAppend("invalid owner name"),
          resp, MasterErrorPB::INVALID_SCHEMA));

    l.mutable_data()->pb.set_owner(req.new_table_owner());
  }

  // 6. Alter the table comment.
  if (req.has_new_table_comment()) {
    RETURN_NOT_OK(SetupError(
        ValidateTableComment(req.new_table_comment()).CloneAndPrepend("invalid table comment"),
        resp, MasterErrorPB::INVALID_SCHEMA));
    l.mutable_data()->pb.set_comment(req.new_table_comment());
  }

  // 7. Alter table partitioning.
  vector<scoped_refptr<TabletInfo>> tablets_to_add;
  vector<scoped_refptr<TabletInfo>> tablets_to_drop;
  bool partition_schema_updated = false;
  if (!alter_partitioning_steps.empty()) {
    TRACE("Apply alter partitioning");
    Schema client_schema;
    RETURN_NOT_OK(SetupError(SchemaFromPB(req.schema(), &client_schema),
        resp, MasterErrorPB::UNKNOWN_ERROR));
    RETURN_NOT_OK(SetupError(ApplyAlterPartitioningSteps(
        table, client_schema, alter_partitioning_steps, &l,
        &tablets_to_add, &tablets_to_drop, &partition_schema_updated),
                             resp, MasterErrorPB::UNKNOWN_ERROR));
  }

  // 8. Alter table's replication factor.
  bool num_replicas_changed = false;
  if (req.has_num_replicas()) {
    int num_replicas = req.num_replicas();
    RETURN_NOT_OK(ValidateNumberReplicas(normalized_table_name,
                                         resp, ValidateType::kAlterTable,
                                         nullopt, num_replicas));
    if (num_replicas != l.data().pb.num_replicas()) {
      num_replicas_changed = true;
      l.mutable_data()->pb.set_num_replicas(num_replicas);
    }
  }

  // 9. Alter table's extra configuration properties.
  if (!req.new_extra_configs().empty()) {
    TRACE("Apply alter extra-config");
    Map<string, string> new_extra_configs;
    RETURN_NOT_OK(ExtraConfigPBToPBMap(l.data().pb.extra_config(),
                                       &new_extra_configs));
    // Merge table's extra configuration properties.
    for (const auto& config : req.new_extra_configs()) {
      new_extra_configs[config.first] = config.second;
    }
    RETURN_NOT_OK(ExtraConfigPBFromPBMap(new_extra_configs,
                                         l.mutable_data()->pb.mutable_extra_config()));
  }

  // Set to true if columns are altered, added or dropped.
  const bool has_schema_changes = !alter_schema_steps.empty();
  // Set to true if there are schema changes, the table is renamed,
  // or if any other table properties changed.
  const bool has_metadata_changes = has_schema_changes ||
      req.has_new_table_name() || req.has_new_table_owner() ||
      !req.new_extra_configs().empty() || req.has_disk_size_limit() ||
      req.has_row_count_limit() || req.has_new_table_comment() ||
      num_replicas_changed;
  // Set to true if there are partitioning changes.
  const bool has_partitioning_changes = !alter_partitioning_steps.empty() ||
      partition_schema_updated;
  // Set to true if metadata changes need to be applied to existing tablets.
  const bool has_metadata_changes_for_existing_tablets =
    has_metadata_changes && table->num_tablets() > tablets_to_drop.size();

  // Skip empty requests...
  if (!has_metadata_changes && !has_partitioning_changes) {
    return Status::OK();
  }

  // 10. Serialize the schema and increment the version number.
  if (needs_range_bounds_refresh) {
    // KUDU-3577: if necessary, re-encode the information on partition
    // boundaries for ranges with custom hash schemas
    Schema cur_schema;  // the current table schema before being altered
    RETURN_NOT_OK(SchemaFromPB(l.data().pb.schema(), &cur_schema));
    PartitionSchema partition_schema;
    RETURN_NOT_OK(PartitionSchema::FromPB(
        l.data().pb.partition_schema(), cur_schema, &partition_schema));
    RETURN_NOT_OK(partition_schema.ToPB(
        new_schema, l.mutable_data()->pb.mutable_partition_schema()));
  }
  if (has_metadata_changes_for_existing_tablets && !l.data().pb.has_fully_applied_schema()) {
    l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
  }
  if (has_schema_changes) {
    CHECK_OK(SchemaToPB(new_schema, l.mutable_data()->pb.mutable_schema()));
  }
  if (has_metadata_changes) {
    l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
    l.mutable_data()->pb.set_next_column_id(next_col_id);
  }
  if (!tablets_to_add.empty() || has_metadata_changes_for_existing_tablets) {
    // If some tablet schemas need to be updated or there are any new tablets,
    // set the table state to ALTERING, so that IsAlterTableDone RPCs will wait
    // for the schema updates and tablets to be running.
    l.mutable_data()->set_state(SysTablesEntryPB::ALTERING,
                                Substitute("Alter Table version=$0 ts=$1",
                                           l.mutable_data()->pb.version(),
                                           LocalTimeAsString()));
  }

  const time_t timestamp = time(nullptr);
  const string deletion_msg = "Partition dropped at " + TimestampAsString(timestamp);
  TabletMetadataGroupLock tablets_to_add_lock(LockMode::WRITE);
  TabletMetadataGroupLock tablets_to_drop_lock(LockMode::RELEASED);

  // 11. Update sys-catalog with the new table schema and tablets to add/drop.
  TRACE("Updating metadata on disk");
  {
    SysCatalogTable::Actions actions;
    actions.hms_notification_log_event_id =
        std::move(hms_notification_log_event_id);
    if (!tablets_to_add.empty() || has_metadata_changes) {
      // If anything modified the table's persistent metadata, then sync it to the sys catalog.
      actions.table_to_update = table;
    }
    actions.tablets_to_add = tablets_to_add;

    tablets_to_add_lock.AddMutableInfos(tablets_to_add);
    tablets_to_drop_lock.AddMutableInfos(tablets_to_drop);
    tablets_to_drop_lock.Lock(LockMode::WRITE);
    for (auto& tablet : tablets_to_drop) {
      tablet->mutable_metadata()->mutable_dirty()->set_state(
          SysTabletsEntryPB::DELETED, deletion_msg);
      tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);
    }
    actions.tablets_to_update = tablets_to_drop;

    Status s = sys_catalog_->Write(std::move(actions));
    if (PREDICT_FALSE(!s.ok())) {
      s = s.CloneAndPrepend("an error occurred while updating the sys-catalog");
      LOG(WARNING) << s.ToString();
      CheckIfNoLongerLeaderAndSetupError(s, resp);
      return s;
    }
  }

  // 12. Commit the in-memory state.
  TRACE("Committing alterations to in-memory state");
  {
    // Commit new tablet in-memory state. This doesn't require taking the global
    // lock since the new tablets are not yet visible, because they haven't been
    // added to the table or tablet index.
    tablets_to_add_lock.Commit();

    // Take the global catalog manager lock in order to modify the global table
    // and tablets indices.
    std::lock_guard lock(lock_);
    if (req.has_new_table_name()) {
      if (normalized_table_names_map_.erase(normalized_table_name) != 1) {
        LOG(FATAL) << "Could not remove table " << table->ToString()
                   << " from map in response to AlterTable request: "
                   << SecureShortDebugString(req);
      }
      InsertOrDie(&normalized_table_names_map_, normalized_new_table_name, table);

      // Alter the table name in the attributes of the metrics.
      table->UpdateMetricsAttrs(normalized_new_table_name);
    }

    // Insert new tablets into the global tablet map. After this, the tablets
    // will be visible in GetTabletLocations RPCs.
    for (const auto& tablet : tablets_to_add) {
      InsertOrDie(&tablet_map_, tablet->id(), tablet);
    }
  }

  // Add and remove new tablets from the table. This makes the tablets visible
  // to GetTableLocations RPCs. This doesn't need to happen under the global
  // lock, since:
  //  * clients can not know the new tablet IDs, so GetTabletLocations RPCs
  //    are impossible.
  //  * the new tablets can not heartbeat yet, since they don't get created
  //    until further down.
  //
  // We acquire new READ locks for tablets_to_add because we've already
  // committed our WRITE locks above, and reordering the operations such that
  // the WRITE locks could be reused would open a short window wherein
  // uninitialized tablet state is published to the world.
  for (const auto& tablet : tablets_to_add) {
    tablet->metadata().ReadLock();
  }
  table->AddRemoveTablets(tablets_to_add, tablets_to_drop);
  for (const auto& tablet : tablets_to_add) {
    tablet->metadata().ReadUnlock();
  }

  // Commit state change for dropped tablets. This comes after removing the
  // tablets from their associated tables so that if a GetTableLocations or
  // GetTabletLocations returns a deleted tablet, the retry will never include
  // the tablet again.
  tablets_to_drop_lock.Commit();

  // If there are schema changes or the owner or comment changed, then update the
  // entry in the Hive Metastore. This is done on a best-effort basis, since Kudu
  // is the source of truth for table schema information, and the table has already
  // been altered in the Kudu catalog via the successful sys-table write above.
  if (hms_catalog_ && (has_schema_changes ||
      req.has_new_table_owner() || req.has_new_table_comment())) {
    // Sanity check: if there are schema changes then this is necessarily not a
    // table rename, since we split out the rename portion into its own
    // 'transaction' which is serialized through the HMS.
    DCHECK(!req.has_new_table_name());
    auto s = hms_catalog_->AlterTable(
        table->id(), normalized_table_name, normalized_table_name,
        GetClusterId(), l.mutable_data()->owner(), new_schema, l.mutable_data()->comment());
    if (PREDICT_TRUE(s.ok())) {
      LOG(INFO) << Substitute(
          "altered HMS schema for table $0", table->ToString());
    } else {
      LOG(WARNING) << Substitute(
          "failed to alter HMS schema for table $0, "
          "HMS schema information will be stale: $1",
          table->ToString(), s.ToString());
    }
  }

  if (!tablets_to_add.empty() || has_metadata_changes || partition_schema_updated) {
    l.Commit();
  } else {
    l.Unlock();
  }

  SendAlterTableRequest(table);
  for (const auto& tablet : tablets_to_drop) {
    TabletMetadataLock l(tablet.get(), LockMode::READ);
    SendDeleteTabletRequest(tablet, l, deletion_msg);
  }

  // 13. Invalidate/purge corresponding entries in the table locations cache.
  if (table_locations_cache_ &&
      (!tablets_to_add.empty() || !tablets_to_drop.empty())) {
    table_locations_cache_->Remove(table->id());
  }

  // Update table's schema related metrics after being altered.
  table->UpdateSchemaMetrics();

  background_tasks_->Wake();
  return Status::OK();
}