Status KuduTableCreator::Create()

in src/kudu/client/client.cc [1003:1161]


Status KuduTableCreator::Create() {
  if (!data_->table_name_.length()) {
    return Status::InvalidArgument("Missing table name");
  }
  if (!data_->schema_) {
    return Status::InvalidArgument("Missing schema");
  }
  if (!data_->partition_schema_.has_range_schema() &&
      data_->partition_schema_.hash_schema().empty()) {
    return Status::InvalidArgument(
        "Table partitioning must be specified using "
        "add_hash_partitions or set_range_partition_columns");
  }

  // Build request.
  CreateTableRequestPB req;
  req.set_name(data_->table_name_);
  if (data_->num_replicas_) {
    req.set_num_replicas(*data_->num_replicas_);
  }
  if (data_->dimension_label_) {
    req.set_dimension_label(*data_->dimension_label_);
  }
  if (data_->extra_configs_) {
    req.mutable_extra_configs()->insert(data_->extra_configs_->begin(),
                                        data_->extra_configs_->end());
  }
  if (data_->owner_) {
    req.set_owner(*data_->owner_);
  }
  if (data_->comment_) {
    req.set_comment(*data_->comment_);
  }
  if (data_->allow_empty_partition_) {
    req.set_allow_empty_partition(*data_->allow_empty_partition_);
  }

  RETURN_NOT_OK_PREPEND(SchemaToPB(*data_->schema_->schema_, req.mutable_schema(),
                                   SCHEMA_PB_WITHOUT_WRITE_DEFAULT),
                        "Invalid schema");

  bool has_range_splits = false;
  RowOperationsPBEncoder splits_encoder(req.mutable_split_rows_range_bounds());
  for (const auto& row : data_->range_partition_splits_) {
    if (!row) {
      return Status::InvalidArgument("range split row must not be null");
    }
    splits_encoder.Add(RowOperationsPB::SPLIT_ROW, *row);
    has_range_splits = true;
  }

  bool has_range_with_custom_hash_schema = false;
  for (const auto& p : data_->range_partitions_) {
    if (!p->data_->is_table_wide_hash_schema_) {
      has_range_with_custom_hash_schema = true;
      break;
    }
  }

  if (has_range_splits && has_range_with_custom_hash_schema) {
    // For simplicity, don't allow having both range splits (deprecated) and
    // custom hash bucket schemas per range partition.
    return Status::InvalidArgument(
        "split rows and custom hash bucket schemas for ranges are incompatible: "
        "choose one or the other");
  }

  auto* partition_schema = req.mutable_partition_schema();
  partition_schema->CopyFrom(data_->partition_schema_);
  for (const auto& p : data_->range_partitions_) {
    const auto* range = p->data_;
    if (!range->lower_bound_ || !range->upper_bound_) {
      return Status::InvalidArgument("range bounds must not be null");
    }

    const RowOperationsPB_Type lower_bound_type =
        range->lower_bound_type_ == KuduTableCreator::INCLUSIVE_BOUND
        ? RowOperationsPB::RANGE_LOWER_BOUND
        : RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND;

    const RowOperationsPB_Type upper_bound_type =
        range->upper_bound_type_ == KuduTableCreator::EXCLUSIVE_BOUND
        ? RowOperationsPB::RANGE_UPPER_BOUND
        : RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND;

    if (!has_range_with_custom_hash_schema) {
      splits_encoder.Add(lower_bound_type, *range->lower_bound_);
      splits_encoder.Add(upper_bound_type, *range->upper_bound_);
    } else {
      auto* range_pb = partition_schema->add_custom_hash_schema_ranges();
      RowOperationsPBEncoder encoder(range_pb->mutable_range_bounds());
      encoder.Add(lower_bound_type, *range->lower_bound_);
      encoder.Add(upper_bound_type, *range->upper_bound_);
      // Now, after adding the information range bounds, add the information
      // on hash schema for the range.
      if (range->is_table_wide_hash_schema_) {
        // With the presence of a range with custom hash schema when the
        // table-wide hash schema is used for this particular range, also add an
        // element into PartitionSchemaPB::custom_hash_schema_ranges to satisfy
        // the convention used by the backend.
        range_pb->mutable_hash_schema()->CopyFrom(
            data_->partition_schema_.hash_schema());
      } else {
        // In case of per-range custom hash bucket schema, add corresponding
        // element into PartitionSchemaPB::custom_hash_schema_ranges.
        for (const auto& hash_dimension : range->hash_schema_) {
          auto* hash_dimension_pb = range_pb->add_hash_schema();
          hash_dimension_pb->set_seed(hash_dimension.seed);
          hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
          for (const auto& column_name : hash_dimension.column_names) {
            hash_dimension_pb->add_columns()->set_name(column_name);
          }
        }
      }
    }
  }

  bool has_immutable_column_schema = false;
  for (size_t i = 0; i < data_->schema_->num_columns(); i++) {
    const auto& col_schema = data_->schema_->Column(i);
    if (col_schema.is_immutable()) {
      has_immutable_column_schema = true;
      break;
    }
  }
  bool has_auto_incrementing_column = data_->schema_->schema_->has_auto_incrementing();

  if (data_->table_type_) {
    req.set_table_type(*data_->table_type_);
  }

  MonoTime deadline = MonoTime::Now();
  if (data_->timeout_.Initialized()) {
    deadline += data_->timeout_;
  } else {
    deadline += data_->client_->default_admin_operation_timeout();
  }

  CreateTableResponsePB resp;
  RETURN_NOT_OK_PREPEND(
      data_->client_->data_->CreateTable(data_->client_,
                                         req,
                                         &resp,
                                         deadline,
                                         !data_->range_partitions_.empty(),
                                         has_range_with_custom_hash_schema,
                                         has_immutable_column_schema,
                                         has_auto_incrementing_column),
      Substitute("Error creating table $0 on the master", data_->table_name_));
  // Spin until the table is fully created, if requested.
  if (data_->wait_) {
    TableIdentifierPB table;
    table.set_table_id(resp.table_id());
    RETURN_NOT_OK(data_->client_->data_->WaitForCreateTableToFinish(
        data_->client_, table, deadline));
  }

  return Status::OK();
}