Status CatalogManager::CreateTableHelper()

in src/kudu/master/catalog_manager.cc [1901:2225]


Status CatalogManager::CreateTableHelper(const CreateTableRequestPB* orig_req,
                                         CreateTableResponsePB* resp,
                                         rpc::RpcContext* rpc,
                                         const optional<string>& username) {
  leader_lock_.AssertAcquiredForReading();

  // Copy the request, so we can fill in some defaults.
  CreateTableRequestPB req = *orig_req;
  optional<string> user;

  if (rpc) {
    user.emplace(rpc->remote_user().username());
  } else {
    user = username;
  }
  // Default the owner if it isn't set.
  if (user && !req.has_owner()) {
    req.set_owner(*user);
  }

  // Do some fix-up of any defaults specified on columns.
  // Clients are only expected to pass the default value in the 'read_default'
  // field, but we need to write the schema to disk including the default
  // as both the 'read' and 'write' default. It's easier to do this fix-up
  // on the protobuf here.
  for (int i = 0; i < req.schema().columns_size(); i++) {
    auto* col = req.mutable_schema()->mutable_columns(i);
    RETURN_NOT_OK(SetupError(ProcessColumnPBDefaults(col), resp, MasterErrorPB::INVALID_SCHEMA));
  }

  bool is_user_table = req.table_type() == TableTypePB::DEFAULT_TABLE;
  const string& normalized_table_name = NormalizeTableName(req.name());
  if (is_user_table) {
    // a. Validate the user request.
    if (rpc) {
      DCHECK(user.has_value());
      RETURN_NOT_OK(SetupError(
          authz_provider_->AuthorizeCreateTable(normalized_table_name, *user, req.owner()),
          resp, MasterErrorPB::NOT_AUTHORIZED));
    }

    // If the HMS integration is enabled, wait for the notification log listener
    // to catch up. This reduces the likelihood of attempting to create a table
    // with a name that conflicts with a table that has just been deleted or
    // renamed in the HMS.
    RETURN_NOT_OK(WaitForNotificationLogListenerCatchUp(resp, rpc));
  } else {
    if (user && !master_->IsServiceUserOrSuperUser(*user)) {
      return SetupError(
          Status::NotAuthorized("must be a service user or super user to create system tables"),
          resp, MasterErrorPB::NOT_AUTHORIZED);
    }
  }

  Schema client_schema;
  RETURN_NOT_OK(SchemaFromPB(req.schema(), &client_schema));

  RETURN_NOT_OK(SetupError(ValidateClientSchema(
      normalized_table_name, req.owner(), req.comment(), client_schema),
      resp, MasterErrorPB::INVALID_SCHEMA));
  if (client_schema.has_column_ids()) {
    return SetupError(Status::InvalidArgument("user requests should not have Column IDs"),
                      resp, MasterErrorPB::INVALID_SCHEMA);
  }
  const Schema schema = client_schema.CopyWithColumnIds();

  // If the client did not set a partition schema in the create table request,
  // the default partition schema (no hash bucket components and a range
  // partitioned on the primary key columns) will be used.
  PartitionSchema partition_schema;
  PartitionSchema::RangesWithHashSchemas ranges_with_hash_schemas;
  RETURN_NOT_OK(SetupError(
      PartitionSchema::FromPB(req.partition_schema(),
                              schema,
                              &partition_schema,
                              &ranges_with_hash_schemas),
      resp, MasterErrorPB::INVALID_SCHEMA));

  // Decode split rows and range bounds.
  vector<KuduPartialRow> split_rows;
  vector<pair<KuduPartialRow, KuduPartialRow>> range_bounds;

  RowOperationsPBDecoder decoder(req.mutable_split_rows_range_bounds(),
                                 &client_schema, &schema, nullptr);
  vector<DecodedRowOperation> ops;
  RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));

  for (size_t i = 0; i < ops.size(); ++i) {
    const DecodedRowOperation& op = ops[i];
    switch (op.type) {
      case RowOperationsPB::SPLIT_ROW: {
        split_rows.push_back(*op.split_row);
        break;
      }
      case RowOperationsPB::RANGE_LOWER_BOUND:
      case RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND: {
        i += 1;
        if (i >= ops.size() ||
            (ops[i].type != RowOperationsPB::RANGE_UPPER_BOUND &&
             ops[i].type != RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND)) {
          return SetupError(
              Status::InvalidArgument("missing upper range bound in create table request"),
              resp, MasterErrorPB::UNKNOWN_ERROR);
        }

        if (op.type == RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) {
          RETURN_NOT_OK(partition_schema.MakeLowerBoundRangePartitionKeyInclusive(
              op.split_row.get()));
        }
        if (ops[i].type == RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
          RETURN_NOT_OK(partition_schema.MakeUpperBoundRangePartitionKeyExclusive(
              ops[i].split_row.get()));
        }
        range_bounds.emplace_back(*op.split_row, *ops[i].split_row);
        break;
      }
      default: return Status::InvalidArgument(
                   Substitute("Illegal row operation type in create table request: $0", op.type));
    }
  }

  vector<Partition> partitions;
  if (const auto& ps = req.partition_schema();
      FLAGS_enable_per_range_hash_schemas && !ps.custom_hash_schema_ranges().empty()) {
    if (!split_rows.empty()) {
      return Status::InvalidArgument(
          "both split rows and custom hash schema ranges must not be "
          "populated at the same time");
    }
    if (!range_bounds.empty()) {
      return Status::InvalidArgument(
          "both range bounds and custom hash schema ranges must not be "
          "populated at the same time");
    }
    // Create partitions based on the specified ranges and their hash schemas.
    RETURN_NOT_OK(partition_schema.CreatePartitions(
        ranges_with_hash_schemas, schema, &partitions));
  } else {
    // Create partitions based on specified partition schema and split rows.
    RETURN_NOT_OK(partition_schema.CreatePartitions(
        split_rows, range_bounds, schema, &partitions, req.allow_empty_partition()));
  }

  // Check the restriction on the same number of hash dimensions across all the
  // ranges. Also, check that the table-wide hash schema has the same number
  // of hash dimensions as all the partitions with custom hash schemas.
  //
  // TODO(aserbin): remove the restriction once the rest of the code is ready
  //                to handle range partitions with arbitrary number of hash
  //                dimensions in hash schemas
  const auto hash_dimensions_num = partition_schema.hash_schema().size();
  for (const auto& p : partitions) {
    if (p.hash_buckets().size() != hash_dimensions_num) {
      return Status::NotSupported(
          "varying number of hash dimensions per range is not yet supported");
    }
  }

  // If they didn't specify a num_replicas, set it based on the default.
  if (!req.has_num_replicas()) {
    req.set_num_replicas(FLAGS_default_num_replicas);
  }

  const auto num_replicas = req.num_replicas();
  RETURN_NOT_OK(ValidateNumberReplicas(normalized_table_name,
                                       resp, ValidateType::kCreateTable,
                                       partitions.size(), num_replicas));

  // Verify the table's extra configuration properties.
  TableExtraConfigPB extra_config_pb;
  RETURN_NOT_OK(ExtraConfigPBFromPBMap(req.extra_configs(), &extra_config_pb));

  scoped_refptr<TableInfo> table;
  {
    std::lock_guard l(lock_);
    TRACE("Acquired catalog manager lock");

    // b. Verify that the table does not exist.
    table = FindTableWithNameUnlocked(normalized_table_name);
    if (table != nullptr) {
      return SetupError(Status::AlreadyPresent(Substitute(
              "table $0 already exists with id $1", normalized_table_name, table->id())),
          resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
    }

    // c. Reserve the table name if possible.
    if (!InsertIfNotPresent(&reserved_normalized_table_names_, normalized_table_name)) {
      // ServiceUnavailable will cause the client to retry the create table
      // request. We don't want to outright fail the request with
      // 'AlreadyPresent', because a table name reservation can be rolled back
      // in the case of an error. Instead, we force the client to retry at a
      // later time.
      return SetupError(Status::ServiceUnavailable(Substitute(
              "new table name $0 is already reserved", normalized_table_name)),
          resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
    }
  }

  // Ensure that we drop the name reservation upon return.
  SCOPED_CLEANUP({
    std::lock_guard l(lock_);
    CHECK_EQ(1, reserved_normalized_table_names_.erase(normalized_table_name));
  });

  // d. Create the in-memory representation of the new table and its tablets.
  //    It's not yet in any global maps; that will happen in step g below.
  table = CreateTableInfo(req, schema, partition_schema, std::move(extra_config_pb));
  vector<scoped_refptr<TabletInfo>> tablets;
  auto abort_mutations = MakeScopedCleanup([&table, &tablets]() {
    table->mutable_metadata()->AbortMutation();
    for (const auto& e : tablets) {
      e->mutable_metadata()->AbortMutation();
    }
  });
  const optional<string> dimension_label =
      req.has_dimension_label() ? make_optional(req.dimension_label()) : nullopt;
  for (const Partition& partition : partitions) {
    PartitionPB partition_pb;
    partition.ToPB(&partition_pb);
    tablets.emplace_back(CreateTabletInfo(table, partition_pb, dimension_label));
  }
  TRACE("Created new table and tablet info");

  // NOTE: the table and tablets are already locked for write at this point,
  // since the CreateTableInfo/CreateTabletInfo functions leave them in that state.
  // They will get committed at the end of this function.
  // Sanity check: the tables and tablets should all be in "preparing" state.
  CHECK_EQ(SysTablesEntryPB::PREPARING, table->metadata().dirty().pb.state());
  for (const auto& tablet : tablets) {
    CHECK_EQ(SysTabletsEntryPB::PREPARING, tablet->metadata().dirty().pb.state());
  }
  table->mutable_metadata()->mutable_dirty()->pb.set_state(SysTablesEntryPB::RUNNING);

  // e. Create the table in the HMS.
  //
  // It is critical that this step happen before writing the table to the sys catalog,
  // since this step validates that the table name is available in the HMS catalog.
  if (hms_catalog_ && is_user_table) {
    CHECK(rpc);
    Status s = hms_catalog_->CreateTable(
        table->id(), normalized_table_name, GetClusterId(), req.owner(), schema, req.comment());
    if (!s.ok()) {
      s = s.CloneAndPrepend(Substitute(
          "failed to create HMS catalog entry for table $0", table->ToString()));
      LOG(WARNING) << s.ToString();
      return SetupError(std::move(s), resp, MasterErrorPB::HIVE_METASTORE_ERROR);
    }
    TRACE("Created new table in HMS catalog");
    LOG(INFO) << Substitute("created HMS catalog entry for table $0",
                            table->ToString());
  }
  // Delete the new HMS entry if we exit early.
  auto abort_hms = MakeScopedCleanup([&] {
      // TODO(dan): figure out how to test this.
      if (hms_catalog_ && is_user_table) {
        TRACE("Rolling back HMS table creation");
        auto s = hms_catalog_->DropTable(table->id(), normalized_table_name);
        if (s.ok()) {
          LOG(INFO) << Substitute(
              "deleted orphaned HMS catalog entry for table $0", table->ToString());
        } else {
          LOG(WARNING) << Substitute(
              "failed to delete orphaned HMS catalog entry for table $0: $1",
              table->ToString(), s.ToString());
        }
      }
  });

  // f. Write table and tablets to sys-catalog.
  {
    SysCatalogTable::Actions actions;
    actions.table_to_add = table;
    actions.tablets_to_add = tablets;
    Status s = sys_catalog_->Write(std::move(actions));
    if (PREDICT_FALSE(!s.ok())) {
      s = s.CloneAndPrepend("an error occurred while writing to the sys-catalog");
      LOG(WARNING) << s.ToString();
      CheckIfNoLongerLeaderAndSetupError(s, resp);
      return s;
    }
  }
  TRACE("Wrote table and tablets to system table");

  // g. Commit the in-memory state.
  abort_hms.cancel();
  table->mutable_metadata()->CommitMutation();

  for (const auto& tablet : tablets) {
    tablet->mutable_metadata()->CommitMutation();
  }
  abort_mutations.cancel();

  // h. Add the tablets to the table.
  //
  // We can't reuse the above WRITE tablet locks for this because
  // AddRemoveTablets() will read from the clean state, which is empty for
  // these brand new tablets.
  for (const auto& tablet : tablets) {
    tablet->metadata().ReadLock();
  }
  table->AddRemoveTablets(tablets, {});
  for (const auto& tablet : tablets) {
    tablet->metadata().ReadUnlock();
  }

  // i. Make the new table and tablets visible in the catalog.
  {
    std::lock_guard l(lock_);

    table_ids_map_[table->id()] = table;
    normalized_table_names_map_[normalized_table_name] = table;
    for (const auto& tablet : tablets) {
      InsertOrDie(&tablet_map_, tablet->id(), tablet);
    }
  }
  TRACE("Inserted table and tablets into CatalogManager maps");

  // Update table's schema related metrics after being created.
  table->UpdateSchemaMetrics();

  resp->set_table_id(table->id());
  VLOG(1) << "Created table " << table->ToString();
  background_tasks_->Wake();
  return Status::OK();
}