in src/kudu/master/catalog_manager.cc [3075:3375]
Status CatalogManager::ApplyAlterPartitioningSteps(
const scoped_refptr<TableInfo>& table,
const Schema& client_schema,
const vector<AlterTableRequestPB::Step>& steps,
TableMetadataLock* l,
vector<scoped_refptr<TabletInfo>>* tablets_to_add,
vector<scoped_refptr<TabletInfo>>* tablets_to_drop,
bool* partition_schema_updated) {
DCHECK(l);
DCHECK(tablets_to_add);
DCHECK(tablets_to_drop);
DCHECK(partition_schema_updated);
// Get the table's schema as it's known to the catalog manager.
Schema schema;
RETURN_NOT_OK(SchemaFromPB(l->data().pb.schema(), &schema));
// Build current PartitionSchema for the table.
PartitionSchema partition_schema;
RETURN_NOT_OK(PartitionSchema::FromPB(
l->data().pb.partition_schema(), schema, &partition_schema));
TableInfo::TabletInfoMap existing_tablets = table->tablet_map();
TableInfo::TabletInfoMap new_tablets;
auto abort_mutations = MakeScopedCleanup([&new_tablets]() {
for (const auto& e : new_tablets) {
e.second->mutable_metadata()->AbortMutation();
}
});
vector<PartitionSchema::HashSchema> range_hash_schemas;
size_t partition_schema_updates = 0;
for (const auto& step : steps) {
CHECK(step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION ||
step.type() == AlterTableRequestPB::DROP_RANGE_PARTITION);
const auto& range_bounds =
step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION
? step.add_range_partition().range_bounds()
: step.drop_range_partition().range_bounds();
RowOperationsPBDecoder decoder(&range_bounds, &client_schema, &schema, nullptr);
vector<DecodedRowOperation> ops;
RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
if (ops.size() != 2) {
return Status::InvalidArgument(
"expected two row operations for alter range partition step",
SecureShortDebugString(step));
}
if ((ops[0].type != RowOperationsPB::RANGE_LOWER_BOUND &&
ops[0].type != RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) ||
(ops[1].type != RowOperationsPB::RANGE_UPPER_BOUND &&
ops[1].type != RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND)) {
return Status::InvalidArgument(
"expected a lower bound and upper bound row op for alter range partition step",
Substitute("$0, $1", ops[0].ToString(schema), ops[1].ToString(schema)));
}
if (ops[0].type == RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) {
RETURN_NOT_OK(partition_schema.MakeLowerBoundRangePartitionKeyInclusive(
ops[0].split_row.get()));
}
if (ops[1].type == RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
RETURN_NOT_OK(partition_schema.MakeUpperBoundRangePartitionKeyExclusive(
ops[1].split_row.get()));
}
vector<Partition> partitions;
const pair<KuduPartialRow, KuduPartialRow> range_bound =
{ *ops[0].split_row, *ops[1].split_row };
if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) {
if (!FLAGS_enable_per_range_hash_schemas ||
!step.add_range_partition().has_custom_hash_schema()) {
RETURN_NOT_OK(partition_schema.CreatePartitions(
{}, { range_bound }, schema, &partitions));
} else {
const auto& custom_hash_schema_pb =
step.add_range_partition().custom_hash_schema().hash_schema();
const Schema schema = client_schema.CopyWithColumnIds();
PartitionSchema::HashSchema hash_schema;
RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
schema, custom_hash_schema_pb, &hash_schema));
if (partition_schema.hash_schema().size() != hash_schema.size()) {
return Status::NotSupported(
"varying number of hash dimensions per range is not yet supported");
}
RETURN_NOT_OK(PartitionSchema::ValidateHashSchema(schema, hash_schema));
RETURN_NOT_OK(partition_schema.CreatePartitionsForRange(
range_bound, hash_schema, schema, &partitions));
// Add information on the new range with custom hash schema into the
// PartitionSchema for the table stored in the system catalog.
auto* p = l->mutable_data()->pb.mutable_partition_schema();
auto* range = p->add_custom_hash_schema_ranges();
RowOperationsPBEncoder encoder(range->mutable_range_bounds());
encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, range_bound.first);
encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, range_bound.second);
for (const auto& hash_dimension : hash_schema) {
auto* hash_dimension_pb = range->add_hash_schema();
hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
hash_dimension_pb->set_seed(hash_dimension.seed);
for (const auto& column_id : hash_dimension.column_ids) {
hash_dimension_pb->add_columns()->set_id(column_id);
}
}
++partition_schema_updates;
}
} else {
DCHECK_EQ(AlterTableRequestPB::DROP_RANGE_PARTITION, step.type());
if (!FLAGS_enable_per_range_hash_schemas ||
!partition_schema.HasCustomHashSchemas()) {
RETURN_NOT_OK(partition_schema.CreatePartitions(
{}, { range_bound }, schema, &partitions));
} else {
const Schema schema = client_schema.CopyWithColumnIds();
PartitionSchema::HashSchema range_hash_schema;
RETURN_NOT_OK(partition_schema.GetHashSchemaForRange(
range_bound.first, schema, &range_hash_schema));
RETURN_NOT_OK(partition_schema.CreatePartitionsForRange(
range_bound, range_hash_schema, schema, &partitions));
// Update the partition schema information to be stored in the system
// catalog table. The information on a range with the table-wide hash
// schema must not be present in the PartitionSchemaPB that the system
// catalog stores, so this is necessary only if the range has custom
// (i.e. other than the table-wide) hash schema.
if (range_hash_schema != partition_schema.hash_schema()) {
RETURN_NOT_OK(partition_schema.DropRange(
range_bound.first, range_bound.second, schema));
PartitionSchemaPB ps_pb;
RETURN_NOT_OK(partition_schema.ToPB(schema, &ps_pb));
// Make sure exactly one range is gone.
DCHECK_EQ(ps_pb.custom_hash_schema_ranges_size() + 1,
l->data().pb.partition_schema().custom_hash_schema_ranges_size());
*(l->mutable_data()->pb.mutable_partition_schema()) = std::move(ps_pb);
++partition_schema_updates;
}
}
}
switch (step.type()) {
case AlterTableRequestPB::ADD_RANGE_PARTITION: {
for (const Partition& partition : partitions) {
const auto& lower_bound = partition.begin();
const auto& upper_bound = partition.end();
// Check that the new tablet does not overlap with any of the existing
// tablets. Since the elements of 'existing_tablets' are ordered by
// the tablets' lower bounds, the iterator points at the tablet
// directly *after* the lower bound or to existing_tablets.end()
// if such a tablet does not exist.
const auto existing_iter = existing_tablets.upper_bound(lower_bound);
if (existing_iter != existing_tablets.end()) {
TabletMetadataLock metadata(existing_iter->second.get(),
LockMode::READ);
const auto& p = metadata.data().pb.partition();
const auto p_begin = Partition::StringToPartitionKey(
p.partition_key_start(), p.hash_buckets_size());
// Check for the overlapping ranges.
if (upper_bound.empty() || p_begin < upper_bound) {
return Status::InvalidArgument(
"new range partition conflicts with existing one",
partition_schema.RangePartitionDebugString(*ops[0].split_row,
*ops[1].split_row));
}
}
// This is the case when there is an existing tablet with the lower
// bound being less or equal to the lower bound of the new tablet to
// create. This cannot be the case of an empty 'existing_tablets'
// container (otherwise, existing_tablets.end() would be equal to
// existing_tablets.begin()), so it's safe to decrement the iterator
// (i.e. call std::prev() on it) and de-reference it.
if (existing_iter != existing_tablets.begin()) {
TabletMetadataLock metadata(std::prev(existing_iter)->second.get(),
LockMode::READ);
const auto& p = metadata.data().pb.partition();
const auto p_begin = Partition::StringToPartitionKey(
p.partition_key_start(), p.hash_buckets_size());
const auto p_end = Partition::StringToPartitionKey(
p.partition_key_end(), p.hash_buckets_size());
// Check for the exact match of ranges.
if (lower_bound == p_begin && upper_bound == p_end) {
return Status::AlreadyPresent(
"range partition already exists",
partition_schema.RangePartitionDebugString(*ops[0].split_row,
*ops[1].split_row));
}
// Check for the overlapping ranges.
if (p_end.empty() || p_end > lower_bound) {
return Status::InvalidArgument(
"new range partition conflicts with existing one",
partition_schema.RangePartitionDebugString(*ops[0].split_row,
*ops[1].split_row));
}
}
// Check that the new tablet doesn't overlap with any other new tablets.
auto new_iter = new_tablets.upper_bound(lower_bound);
if (new_iter != new_tablets.end()) {
// Check for the overlapping ranges.
const auto& p = new_iter->second->mutable_metadata()->dirty().pb.partition();
const auto p_begin = Partition::StringToPartitionKey(
p.partition_key_start(), p.hash_buckets_size());
if (upper_bound.empty() || p_begin < upper_bound) {
return Status::InvalidArgument(
"new range partition conflicts with another newly added one",
partition_schema.RangePartitionDebugString(*ops[0].split_row,
*ops[1].split_row));
}
}
if (new_iter != new_tablets.begin()) {
const auto& p = std::prev(new_iter)->second->mutable_metadata()->dirty().pb.partition();
const auto p_begin = Partition::StringToPartitionKey(
p.partition_key_start(), p.hash_buckets_size());
const auto p_end = Partition::StringToPartitionKey(
p.partition_key_end(), p.hash_buckets_size());
// Check for the exact match of ranges.
if (lower_bound == p_begin && upper_bound == p_end) {
return Status::AlreadyPresent(
"new range partition duplicates another newly added one",
partition_schema.RangePartitionDebugString(*ops[0].split_row,
*ops[1].split_row));
}
// Check for the overlapping ranges.
if (p_end.empty() || p_end > lower_bound) {
return Status::InvalidArgument(
"new range partition conflicts with another newly added one",
partition_schema.RangePartitionDebugString(*ops[0].split_row,
*ops[1].split_row));
}
}
const optional<string> dimension_label =
step.add_range_partition().has_dimension_label()
? make_optional(step.add_range_partition().dimension_label())
: nullopt;
PartitionPB partition_pb;
partition.ToPB(&partition_pb);
new_tablets.emplace(lower_bound,
CreateTabletInfo(table, partition_pb, dimension_label));
}
break;
}
case AlterTableRequestPB::DROP_RANGE_PARTITION: {
for (const Partition& partition : partitions) {
const auto& lower_bound = partition.begin();
const auto& upper_bound = partition.end();
// Iter points to the tablet if it exists, or the next tablet, or the end.
auto existing_iter = existing_tablets.lower_bound(lower_bound);
auto new_iter = new_tablets.lower_bound(lower_bound);
bool found_existing = false;
bool found_new = false;
if (existing_iter != existing_tablets.end()) {
TabletMetadataLock metadata(existing_iter->second.get(), LockMode::READ);
const auto& p = metadata.data().pb.partition();
const auto p_begin = Partition::StringToPartitionKey(
p.partition_key_start(), p.hash_buckets_size());
const auto p_end = Partition::StringToPartitionKey(
p.partition_key_end(), p.hash_buckets_size());
found_existing = p_begin == lower_bound && p_end == upper_bound;
}
if (new_iter != new_tablets.end()) {
const auto& p = new_iter->second->mutable_metadata()->dirty().pb.partition();
const auto p_begin = Partition::StringToPartitionKey(
p.partition_key_start(), p.hash_buckets_size());
const auto p_end = Partition::StringToPartitionKey(
p.partition_key_end(), p.hash_buckets_size());
found_new = p_begin == lower_bound && p_end == upper_bound;
}
DCHECK(!found_existing || !found_new);
if (found_existing) {
tablets_to_drop->emplace_back(existing_iter->second);
existing_tablets.erase(existing_iter);
} else if (found_new) {
new_iter->second->mutable_metadata()->AbortMutation();
new_tablets.erase(new_iter);
} else {
return Status::InvalidArgument("no range partition to drop",
partition_schema.RangePartitionDebugString(*ops[0].split_row,
*ops[1].split_row));
}
}
break;
}
default: {
return Status::InvalidArgument("unknown alter table range partitioning step",
SecureShortDebugString(step));
}
}
}
for (auto& tablet : new_tablets) {
tablets_to_add->emplace_back(std::move(tablet.second));
}
abort_mutations.cancel();
*partition_schema_updated = partition_schema_updates > 0;
return Status::OK();
}