in src/kudu/master/catalog_manager.cc [1901:2225]
Status CatalogManager::CreateTableHelper(const CreateTableRequestPB* orig_req,
CreateTableResponsePB* resp,
rpc::RpcContext* rpc,
const optional<string>& username) {
leader_lock_.AssertAcquiredForReading();
// Copy the request, so we can fill in some defaults.
CreateTableRequestPB req = *orig_req;
optional<string> user;
if (rpc) {
user.emplace(rpc->remote_user().username());
} else {
user = username;
}
// Default the owner if it isn't set.
if (user && !req.has_owner()) {
req.set_owner(*user);
}
// Do some fix-up of any defaults specified on columns.
// Clients are only expected to pass the default value in the 'read_default'
// field, but we need to write the schema to disk including the default
// as both the 'read' and 'write' default. It's easier to do this fix-up
// on the protobuf here.
for (int i = 0; i < req.schema().columns_size(); i++) {
auto* col = req.mutable_schema()->mutable_columns(i);
RETURN_NOT_OK(SetupError(ProcessColumnPBDefaults(col), resp, MasterErrorPB::INVALID_SCHEMA));
}
bool is_user_table = req.table_type() == TableTypePB::DEFAULT_TABLE;
const string& normalized_table_name = NormalizeTableName(req.name());
if (is_user_table) {
// a. Validate the user request.
if (rpc) {
DCHECK(user.has_value());
RETURN_NOT_OK(SetupError(
authz_provider_->AuthorizeCreateTable(normalized_table_name, *user, req.owner()),
resp, MasterErrorPB::NOT_AUTHORIZED));
}
// If the HMS integration is enabled, wait for the notification log listener
// to catch up. This reduces the likelihood of attempting to create a table
// with a name that conflicts with a table that has just been deleted or
// renamed in the HMS.
RETURN_NOT_OK(WaitForNotificationLogListenerCatchUp(resp, rpc));
} else {
if (user && !master_->IsServiceUserOrSuperUser(*user)) {
return SetupError(
Status::NotAuthorized("must be a service user or super user to create system tables"),
resp, MasterErrorPB::NOT_AUTHORIZED);
}
}
Schema client_schema;
RETURN_NOT_OK(SchemaFromPB(req.schema(), &client_schema));
RETURN_NOT_OK(SetupError(ValidateClientSchema(
normalized_table_name, req.owner(), req.comment(), client_schema),
resp, MasterErrorPB::INVALID_SCHEMA));
if (client_schema.has_column_ids()) {
return SetupError(Status::InvalidArgument("user requests should not have Column IDs"),
resp, MasterErrorPB::INVALID_SCHEMA);
}
const Schema schema = client_schema.CopyWithColumnIds();
// If the client did not set a partition schema in the create table request,
// the default partition schema (no hash bucket components and a range
// partitioned on the primary key columns) will be used.
PartitionSchema partition_schema;
PartitionSchema::RangesWithHashSchemas ranges_with_hash_schemas;
RETURN_NOT_OK(SetupError(
PartitionSchema::FromPB(req.partition_schema(),
schema,
&partition_schema,
&ranges_with_hash_schemas),
resp, MasterErrorPB::INVALID_SCHEMA));
// Decode split rows and range bounds.
vector<KuduPartialRow> split_rows;
vector<pair<KuduPartialRow, KuduPartialRow>> range_bounds;
RowOperationsPBDecoder decoder(req.mutable_split_rows_range_bounds(),
&client_schema, &schema, nullptr);
vector<DecodedRowOperation> ops;
RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
for (size_t i = 0; i < ops.size(); ++i) {
const DecodedRowOperation& op = ops[i];
switch (op.type) {
case RowOperationsPB::SPLIT_ROW: {
split_rows.push_back(*op.split_row);
break;
}
case RowOperationsPB::RANGE_LOWER_BOUND:
case RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND: {
i += 1;
if (i >= ops.size() ||
(ops[i].type != RowOperationsPB::RANGE_UPPER_BOUND &&
ops[i].type != RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND)) {
return SetupError(
Status::InvalidArgument("missing upper range bound in create table request"),
resp, MasterErrorPB::UNKNOWN_ERROR);
}
if (op.type == RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) {
RETURN_NOT_OK(partition_schema.MakeLowerBoundRangePartitionKeyInclusive(
op.split_row.get()));
}
if (ops[i].type == RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
RETURN_NOT_OK(partition_schema.MakeUpperBoundRangePartitionKeyExclusive(
ops[i].split_row.get()));
}
range_bounds.emplace_back(*op.split_row, *ops[i].split_row);
break;
}
default: return Status::InvalidArgument(
Substitute("Illegal row operation type in create table request: $0", op.type));
}
}
vector<Partition> partitions;
if (const auto& ps = req.partition_schema();
FLAGS_enable_per_range_hash_schemas && !ps.custom_hash_schema_ranges().empty()) {
if (!split_rows.empty()) {
return Status::InvalidArgument(
"both split rows and custom hash schema ranges must not be "
"populated at the same time");
}
if (!range_bounds.empty()) {
return Status::InvalidArgument(
"both range bounds and custom hash schema ranges must not be "
"populated at the same time");
}
// Create partitions based on the specified ranges and their hash schemas.
RETURN_NOT_OK(partition_schema.CreatePartitions(
ranges_with_hash_schemas, schema, &partitions));
} else {
// Create partitions based on specified partition schema and split rows.
RETURN_NOT_OK(partition_schema.CreatePartitions(
split_rows, range_bounds, schema, &partitions, req.allow_empty_partition()));
}
// Check the restriction on the same number of hash dimensions across all the
// ranges. Also, check that the table-wide hash schema has the same number
// of hash dimensions as all the partitions with custom hash schemas.
//
// TODO(aserbin): remove the restriction once the rest of the code is ready
// to handle range partitions with arbitrary number of hash
// dimensions in hash schemas
const auto hash_dimensions_num = partition_schema.hash_schema().size();
for (const auto& p : partitions) {
if (p.hash_buckets().size() != hash_dimensions_num) {
return Status::NotSupported(
"varying number of hash dimensions per range is not yet supported");
}
}
// If they didn't specify a num_replicas, set it based on the default.
if (!req.has_num_replicas()) {
req.set_num_replicas(FLAGS_default_num_replicas);
}
const auto num_replicas = req.num_replicas();
RETURN_NOT_OK(ValidateNumberReplicas(normalized_table_name,
resp, ValidateType::kCreateTable,
partitions.size(), num_replicas));
// Verify the table's extra configuration properties.
TableExtraConfigPB extra_config_pb;
RETURN_NOT_OK(ExtraConfigPBFromPBMap(req.extra_configs(), &extra_config_pb));
scoped_refptr<TableInfo> table;
{
std::lock_guard l(lock_);
TRACE("Acquired catalog manager lock");
// b. Verify that the table does not exist.
table = FindTableWithNameUnlocked(normalized_table_name);
if (table != nullptr) {
return SetupError(Status::AlreadyPresent(Substitute(
"table $0 already exists with id $1", normalized_table_name, table->id())),
resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
}
// c. Reserve the table name if possible.
if (!InsertIfNotPresent(&reserved_normalized_table_names_, normalized_table_name)) {
// ServiceUnavailable will cause the client to retry the create table
// request. We don't want to outright fail the request with
// 'AlreadyPresent', because a table name reservation can be rolled back
// in the case of an error. Instead, we force the client to retry at a
// later time.
return SetupError(Status::ServiceUnavailable(Substitute(
"new table name $0 is already reserved", normalized_table_name)),
resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
}
}
// Ensure that we drop the name reservation upon return.
SCOPED_CLEANUP({
std::lock_guard l(lock_);
CHECK_EQ(1, reserved_normalized_table_names_.erase(normalized_table_name));
});
// d. Create the in-memory representation of the new table and its tablets.
// It's not yet in any global maps; that will happen in step g below.
table = CreateTableInfo(req, schema, partition_schema, std::move(extra_config_pb));
vector<scoped_refptr<TabletInfo>> tablets;
auto abort_mutations = MakeScopedCleanup([&table, &tablets]() {
table->mutable_metadata()->AbortMutation();
for (const auto& e : tablets) {
e->mutable_metadata()->AbortMutation();
}
});
const optional<string> dimension_label =
req.has_dimension_label() ? make_optional(req.dimension_label()) : nullopt;
for (const Partition& partition : partitions) {
PartitionPB partition_pb;
partition.ToPB(&partition_pb);
tablets.emplace_back(CreateTabletInfo(table, partition_pb, dimension_label));
}
TRACE("Created new table and tablet info");
// NOTE: the table and tablets are already locked for write at this point,
// since the CreateTableInfo/CreateTabletInfo functions leave them in that state.
// They will get committed at the end of this function.
// Sanity check: the tables and tablets should all be in "preparing" state.
CHECK_EQ(SysTablesEntryPB::PREPARING, table->metadata().dirty().pb.state());
for (const auto& tablet : tablets) {
CHECK_EQ(SysTabletsEntryPB::PREPARING, tablet->metadata().dirty().pb.state());
}
table->mutable_metadata()->mutable_dirty()->pb.set_state(SysTablesEntryPB::RUNNING);
// e. Create the table in the HMS.
//
// It is critical that this step happen before writing the table to the sys catalog,
// since this step validates that the table name is available in the HMS catalog.
if (hms_catalog_ && is_user_table) {
CHECK(rpc);
Status s = hms_catalog_->CreateTable(
table->id(), normalized_table_name, GetClusterId(), req.owner(), schema, req.comment());
if (!s.ok()) {
s = s.CloneAndPrepend(Substitute(
"failed to create HMS catalog entry for table $0", table->ToString()));
LOG(WARNING) << s.ToString();
return SetupError(std::move(s), resp, MasterErrorPB::HIVE_METASTORE_ERROR);
}
TRACE("Created new table in HMS catalog");
LOG(INFO) << Substitute("created HMS catalog entry for table $0",
table->ToString());
}
// Delete the new HMS entry if we exit early.
auto abort_hms = MakeScopedCleanup([&] {
// TODO(dan): figure out how to test this.
if (hms_catalog_ && is_user_table) {
TRACE("Rolling back HMS table creation");
auto s = hms_catalog_->DropTable(table->id(), normalized_table_name);
if (s.ok()) {
LOG(INFO) << Substitute(
"deleted orphaned HMS catalog entry for table $0", table->ToString());
} else {
LOG(WARNING) << Substitute(
"failed to delete orphaned HMS catalog entry for table $0: $1",
table->ToString(), s.ToString());
}
}
});
// f. Write table and tablets to sys-catalog.
{
SysCatalogTable::Actions actions;
actions.table_to_add = table;
actions.tablets_to_add = tablets;
Status s = sys_catalog_->Write(std::move(actions));
if (PREDICT_FALSE(!s.ok())) {
s = s.CloneAndPrepend("an error occurred while writing to the sys-catalog");
LOG(WARNING) << s.ToString();
CheckIfNoLongerLeaderAndSetupError(s, resp);
return s;
}
}
TRACE("Wrote table and tablets to system table");
// g. Commit the in-memory state.
abort_hms.cancel();
table->mutable_metadata()->CommitMutation();
for (const auto& tablet : tablets) {
tablet->mutable_metadata()->CommitMutation();
}
abort_mutations.cancel();
// h. Add the tablets to the table.
//
// We can't reuse the above WRITE tablet locks for this because
// AddRemoveTablets() will read from the clean state, which is empty for
// these brand new tablets.
for (const auto& tablet : tablets) {
tablet->metadata().ReadLock();
}
table->AddRemoveTablets(tablets, {});
for (const auto& tablet : tablets) {
tablet->metadata().ReadUnlock();
}
// i. Make the new table and tablets visible in the catalog.
{
std::lock_guard l(lock_);
table_ids_map_[table->id()] = table;
normalized_table_names_map_[normalized_table_name] = table;
for (const auto& tablet : tablets) {
InsertOrDie(&tablet_map_, tablet->id(), tablet);
}
}
TRACE("Inserted table and tablets into CatalogManager maps");
// Update table's schema related metrics after being created.
table->UpdateSchemaMetrics();
resp->set_table_id(table->id());
VLOG(1) << "Created table " << table->ToString();
background_tasks_->Wake();
return Status::OK();
}