in src/kudu/client/client.cc [1003:1161]
Status KuduTableCreator::Create() {
if (!data_->table_name_.length()) {
return Status::InvalidArgument("Missing table name");
}
if (!data_->schema_) {
return Status::InvalidArgument("Missing schema");
}
if (!data_->partition_schema_.has_range_schema() &&
data_->partition_schema_.hash_schema().empty()) {
return Status::InvalidArgument(
"Table partitioning must be specified using "
"add_hash_partitions or set_range_partition_columns");
}
// Build request.
CreateTableRequestPB req;
req.set_name(data_->table_name_);
if (data_->num_replicas_) {
req.set_num_replicas(*data_->num_replicas_);
}
if (data_->dimension_label_) {
req.set_dimension_label(*data_->dimension_label_);
}
if (data_->extra_configs_) {
req.mutable_extra_configs()->insert(data_->extra_configs_->begin(),
data_->extra_configs_->end());
}
if (data_->owner_) {
req.set_owner(*data_->owner_);
}
if (data_->comment_) {
req.set_comment(*data_->comment_);
}
if (data_->allow_empty_partition_) {
req.set_allow_empty_partition(*data_->allow_empty_partition_);
}
RETURN_NOT_OK_PREPEND(SchemaToPB(*data_->schema_->schema_, req.mutable_schema(),
SCHEMA_PB_WITHOUT_WRITE_DEFAULT),
"Invalid schema");
bool has_range_splits = false;
RowOperationsPBEncoder splits_encoder(req.mutable_split_rows_range_bounds());
for (const auto& row : data_->range_partition_splits_) {
if (!row) {
return Status::InvalidArgument("range split row must not be null");
}
splits_encoder.Add(RowOperationsPB::SPLIT_ROW, *row);
has_range_splits = true;
}
bool has_range_with_custom_hash_schema = false;
for (const auto& p : data_->range_partitions_) {
if (!p->data_->is_table_wide_hash_schema_) {
has_range_with_custom_hash_schema = true;
break;
}
}
if (has_range_splits && has_range_with_custom_hash_schema) {
// For simplicity, don't allow having both range splits (deprecated) and
// custom hash bucket schemas per range partition.
return Status::InvalidArgument(
"split rows and custom hash bucket schemas for ranges are incompatible: "
"choose one or the other");
}
auto* partition_schema = req.mutable_partition_schema();
partition_schema->CopyFrom(data_->partition_schema_);
for (const auto& p : data_->range_partitions_) {
const auto* range = p->data_;
if (!range->lower_bound_ || !range->upper_bound_) {
return Status::InvalidArgument("range bounds must not be null");
}
const RowOperationsPB_Type lower_bound_type =
range->lower_bound_type_ == KuduTableCreator::INCLUSIVE_BOUND
? RowOperationsPB::RANGE_LOWER_BOUND
: RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND;
const RowOperationsPB_Type upper_bound_type =
range->upper_bound_type_ == KuduTableCreator::EXCLUSIVE_BOUND
? RowOperationsPB::RANGE_UPPER_BOUND
: RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND;
if (!has_range_with_custom_hash_schema) {
splits_encoder.Add(lower_bound_type, *range->lower_bound_);
splits_encoder.Add(upper_bound_type, *range->upper_bound_);
} else {
auto* range_pb = partition_schema->add_custom_hash_schema_ranges();
RowOperationsPBEncoder encoder(range_pb->mutable_range_bounds());
encoder.Add(lower_bound_type, *range->lower_bound_);
encoder.Add(upper_bound_type, *range->upper_bound_);
// Now, after adding the information range bounds, add the information
// on hash schema for the range.
if (range->is_table_wide_hash_schema_) {
// With the presence of a range with custom hash schema when the
// table-wide hash schema is used for this particular range, also add an
// element into PartitionSchemaPB::custom_hash_schema_ranges to satisfy
// the convention used by the backend.
range_pb->mutable_hash_schema()->CopyFrom(
data_->partition_schema_.hash_schema());
} else {
// In case of per-range custom hash bucket schema, add corresponding
// element into PartitionSchemaPB::custom_hash_schema_ranges.
for (const auto& hash_dimension : range->hash_schema_) {
auto* hash_dimension_pb = range_pb->add_hash_schema();
hash_dimension_pb->set_seed(hash_dimension.seed);
hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
for (const auto& column_name : hash_dimension.column_names) {
hash_dimension_pb->add_columns()->set_name(column_name);
}
}
}
}
}
bool has_immutable_column_schema = false;
for (size_t i = 0; i < data_->schema_->num_columns(); i++) {
const auto& col_schema = data_->schema_->Column(i);
if (col_schema.is_immutable()) {
has_immutable_column_schema = true;
break;
}
}
bool has_auto_incrementing_column = data_->schema_->schema_->has_auto_incrementing();
if (data_->table_type_) {
req.set_table_type(*data_->table_type_);
}
MonoTime deadline = MonoTime::Now();
if (data_->timeout_.Initialized()) {
deadline += data_->timeout_;
} else {
deadline += data_->client_->default_admin_operation_timeout();
}
CreateTableResponsePB resp;
RETURN_NOT_OK_PREPEND(
data_->client_->data_->CreateTable(data_->client_,
req,
&resp,
deadline,
!data_->range_partitions_.empty(),
has_range_with_custom_hash_schema,
has_immutable_column_schema,
has_auto_incrementing_column),
Substitute("Error creating table $0 on the master", data_->table_name_));
// Spin until the table is fully created, if requested.
if (data_->wait_) {
TableIdentifierPB table;
table.set_table_id(resp.table_id());
RETURN_NOT_OK(data_->client_->data_->WaitForCreateTableToFinish(
data_->client_, table, deadline));
}
return Status::OK();
}