Status CatalogManager::ApplyAlterPartitioningSteps()

in src/kudu/master/catalog_manager.cc [3075:3375]


Status CatalogManager::ApplyAlterPartitioningSteps(
    const scoped_refptr<TableInfo>& table,
    const Schema& client_schema,
    const vector<AlterTableRequestPB::Step>& steps,
    TableMetadataLock* l,
    vector<scoped_refptr<TabletInfo>>* tablets_to_add,
    vector<scoped_refptr<TabletInfo>>* tablets_to_drop,
    bool* partition_schema_updated) {
  DCHECK(l);
  DCHECK(tablets_to_add);
  DCHECK(tablets_to_drop);
  DCHECK(partition_schema_updated);

  // Get the table's schema as it's known to the catalog manager.
  Schema schema;
  RETURN_NOT_OK(SchemaFromPB(l->data().pb.schema(), &schema));
  // Build current PartitionSchema for the table.
  PartitionSchema partition_schema;
  RETURN_NOT_OK(PartitionSchema::FromPB(
      l->data().pb.partition_schema(), schema, &partition_schema));
  TableInfo::TabletInfoMap existing_tablets = table->tablet_map();
  TableInfo::TabletInfoMap new_tablets;
  auto abort_mutations = MakeScopedCleanup([&new_tablets]() {
    for (const auto& e : new_tablets) {
      e.second->mutable_metadata()->AbortMutation();
    }
  });

  vector<PartitionSchema::HashSchema> range_hash_schemas;
  size_t partition_schema_updates = 0;
  for (const auto& step : steps) {
    CHECK(step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION ||
          step.type() == AlterTableRequestPB::DROP_RANGE_PARTITION);
    const auto& range_bounds =
        step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION
        ? step.add_range_partition().range_bounds()
        : step.drop_range_partition().range_bounds();
    RowOperationsPBDecoder decoder(&range_bounds, &client_schema, &schema, nullptr);
    vector<DecodedRowOperation> ops;
    RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));

    if (ops.size() != 2) {
      return Status::InvalidArgument(
          "expected two row operations for alter range partition step",
          SecureShortDebugString(step));
    }

    if ((ops[0].type != RowOperationsPB::RANGE_LOWER_BOUND &&
         ops[0].type != RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) ||
        (ops[1].type != RowOperationsPB::RANGE_UPPER_BOUND &&
         ops[1].type != RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND)) {
      return Status::InvalidArgument(
          "expected a lower bound and upper bound row op for alter range partition step",
          Substitute("$0, $1", ops[0].ToString(schema), ops[1].ToString(schema)));
    }

    if (ops[0].type == RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) {
      RETURN_NOT_OK(partition_schema.MakeLowerBoundRangePartitionKeyInclusive(
            ops[0].split_row.get()));
    }
    if (ops[1].type == RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
      RETURN_NOT_OK(partition_schema.MakeUpperBoundRangePartitionKeyExclusive(
            ops[1].split_row.get()));
    }

    vector<Partition> partitions;
    const pair<KuduPartialRow, KuduPartialRow> range_bound =
        { *ops[0].split_row, *ops[1].split_row };
    if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) {
      if (!FLAGS_enable_per_range_hash_schemas ||
          !step.add_range_partition().has_custom_hash_schema()) {
        RETURN_NOT_OK(partition_schema.CreatePartitions(
            {}, { range_bound }, schema, &partitions));
      } else {
        const auto& custom_hash_schema_pb =
            step.add_range_partition().custom_hash_schema().hash_schema();
        const Schema schema = client_schema.CopyWithColumnIds();
        PartitionSchema::HashSchema hash_schema;
        RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
            schema, custom_hash_schema_pb, &hash_schema));
        if (partition_schema.hash_schema().size() != hash_schema.size()) {
          return Status::NotSupported(
              "varying number of hash dimensions per range is not yet supported");
        }
        RETURN_NOT_OK(PartitionSchema::ValidateHashSchema(schema, hash_schema));
        RETURN_NOT_OK(partition_schema.CreatePartitionsForRange(
            range_bound, hash_schema, schema, &partitions));

        // Add information on the new range with custom hash schema into the
        // PartitionSchema for the table stored in the system catalog.
        auto* p = l->mutable_data()->pb.mutable_partition_schema();
        auto* range = p->add_custom_hash_schema_ranges();
        RowOperationsPBEncoder encoder(range->mutable_range_bounds());
        encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, range_bound.first);
        encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, range_bound.second);
        for (const auto& hash_dimension : hash_schema) {
          auto* hash_dimension_pb = range->add_hash_schema();
          hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
          hash_dimension_pb->set_seed(hash_dimension.seed);
          for (const auto& column_id : hash_dimension.column_ids) {
            hash_dimension_pb->add_columns()->set_id(column_id);
          }
        }
        ++partition_schema_updates;
      }
    } else {
      DCHECK_EQ(AlterTableRequestPB::DROP_RANGE_PARTITION, step.type());
      if (!FLAGS_enable_per_range_hash_schemas ||
          !partition_schema.HasCustomHashSchemas()) {
        RETURN_NOT_OK(partition_schema.CreatePartitions(
            {}, { range_bound }, schema, &partitions));
      } else {
        const Schema schema = client_schema.CopyWithColumnIds();
        PartitionSchema::HashSchema range_hash_schema;
        RETURN_NOT_OK(partition_schema.GetHashSchemaForRange(
            range_bound.first, schema, &range_hash_schema));
        RETURN_NOT_OK(partition_schema.CreatePartitionsForRange(
            range_bound, range_hash_schema, schema, &partitions));

        // Update the partition schema information to be stored in the system
        // catalog table. The information on a range with the table-wide hash
        // schema must not be present in the PartitionSchemaPB that the system
        // catalog stores, so this is necessary only if the range has custom
        // (i.e. other than the table-wide) hash schema.
        if (range_hash_schema != partition_schema.hash_schema()) {
          RETURN_NOT_OK(partition_schema.DropRange(
              range_bound.first, range_bound.second, schema));
          PartitionSchemaPB ps_pb;
          RETURN_NOT_OK(partition_schema.ToPB(schema, &ps_pb));
          // Make sure exactly one range is gone.
          DCHECK_EQ(ps_pb.custom_hash_schema_ranges_size() + 1,
                    l->data().pb.partition_schema().custom_hash_schema_ranges_size());
          *(l->mutable_data()->pb.mutable_partition_schema()) = std::move(ps_pb);
          ++partition_schema_updates;
        }
      }
    }

    switch (step.type()) {
      case AlterTableRequestPB::ADD_RANGE_PARTITION: {
        for (const Partition& partition : partitions) {
          const auto& lower_bound = partition.begin();
          const auto& upper_bound = partition.end();

          // Check that the new tablet does not overlap with any of the existing
          // tablets. Since the elements of 'existing_tablets' are ordered by
          // the tablets' lower bounds, the iterator points at the tablet
          // directly *after* the lower bound or to existing_tablets.end()
          // if such a tablet does not exist.
          const auto existing_iter = existing_tablets.upper_bound(lower_bound);
          if (existing_iter != existing_tablets.end()) {
            TabletMetadataLock metadata(existing_iter->second.get(),
                                        LockMode::READ);
            const auto& p = metadata.data().pb.partition();
            const auto p_begin = Partition::StringToPartitionKey(
                p.partition_key_start(), p.hash_buckets_size());
            // Check for the overlapping ranges.
            if (upper_bound.empty() || p_begin < upper_bound) {
              return Status::InvalidArgument(
                  "new range partition conflicts with existing one",
                  partition_schema.RangePartitionDebugString(*ops[0].split_row,
                                                             *ops[1].split_row));
            }
          }
          // This is the case when there is an existing tablet with the lower
          // bound being less or equal to the lower bound of the new tablet to
          // create. This cannot be the case of an empty 'existing_tablets'
          // container (otherwise, existing_tablets.end() would be equal to
          // existing_tablets.begin()), so it's safe to decrement the iterator
          // (i.e. call std::prev() on it) and de-reference it.
          if (existing_iter != existing_tablets.begin()) {
            TabletMetadataLock metadata(std::prev(existing_iter)->second.get(),
                                        LockMode::READ);
            const auto& p = metadata.data().pb.partition();
            const auto p_begin = Partition::StringToPartitionKey(
                p.partition_key_start(), p.hash_buckets_size());
            const auto p_end = Partition::StringToPartitionKey(
                p.partition_key_end(), p.hash_buckets_size());
            // Check for the exact match of ranges.
            if (lower_bound == p_begin && upper_bound == p_end) {
              return Status::AlreadyPresent(
                  "range partition already exists",
                  partition_schema.RangePartitionDebugString(*ops[0].split_row,
                                                             *ops[1].split_row));
            }
            // Check for the overlapping ranges.
            if (p_end.empty() || p_end > lower_bound) {
              return Status::InvalidArgument(
                  "new range partition conflicts with existing one",
                  partition_schema.RangePartitionDebugString(*ops[0].split_row,
                                                             *ops[1].split_row));
            }
          }

          // Check that the new tablet doesn't overlap with any other new tablets.
          auto new_iter = new_tablets.upper_bound(lower_bound);
          if (new_iter != new_tablets.end()) {
            // Check for the overlapping ranges.
            const auto& p = new_iter->second->mutable_metadata()->dirty().pb.partition();
            const auto p_begin = Partition::StringToPartitionKey(
                p.partition_key_start(), p.hash_buckets_size());
            if (upper_bound.empty() || p_begin < upper_bound) {
              return Status::InvalidArgument(
                  "new range partition conflicts with another newly added one",
                  partition_schema.RangePartitionDebugString(*ops[0].split_row,
                                                             *ops[1].split_row));
            }
          }
          if (new_iter != new_tablets.begin()) {
            const auto& p = std::prev(new_iter)->second->mutable_metadata()->dirty().pb.partition();
            const auto p_begin = Partition::StringToPartitionKey(
                p.partition_key_start(), p.hash_buckets_size());
            const auto p_end = Partition::StringToPartitionKey(
                p.partition_key_end(), p.hash_buckets_size());
            // Check for the exact match of ranges.
            if (lower_bound == p_begin && upper_bound == p_end) {
              return Status::AlreadyPresent(
                  "new range partition duplicates another newly added one",
                  partition_schema.RangePartitionDebugString(*ops[0].split_row,
                                                             *ops[1].split_row));
            }
            // Check for the overlapping ranges.
            if (p_end.empty() || p_end > lower_bound) {
              return Status::InvalidArgument(
                  "new range partition conflicts with another newly added one",
                  partition_schema.RangePartitionDebugString(*ops[0].split_row,
                                                             *ops[1].split_row));
            }
          }

          const optional<string> dimension_label =
              step.add_range_partition().has_dimension_label()
                  ? make_optional(step.add_range_partition().dimension_label())
                  : nullopt;
          PartitionPB partition_pb;
          partition.ToPB(&partition_pb);
          new_tablets.emplace(lower_bound,
                              CreateTabletInfo(table, partition_pb, dimension_label));
        }
        break;
      }

      case AlterTableRequestPB::DROP_RANGE_PARTITION: {
        for (const Partition& partition : partitions) {
          const auto& lower_bound = partition.begin();
          const auto& upper_bound = partition.end();

          // Iter points to the tablet if it exists, or the next tablet, or the end.
          auto existing_iter = existing_tablets.lower_bound(lower_bound);
          auto new_iter = new_tablets.lower_bound(lower_bound);

          bool found_existing = false;
          bool found_new = false;

          if (existing_iter != existing_tablets.end()) {
            TabletMetadataLock metadata(existing_iter->second.get(), LockMode::READ);
            const auto& p = metadata.data().pb.partition();
            const auto p_begin = Partition::StringToPartitionKey(
                p.partition_key_start(), p.hash_buckets_size());
            const auto p_end = Partition::StringToPartitionKey(
                p.partition_key_end(), p.hash_buckets_size());
            found_existing = p_begin == lower_bound && p_end == upper_bound;
          }
          if (new_iter != new_tablets.end()) {
            const auto& p = new_iter->second->mutable_metadata()->dirty().pb.partition();
            const auto p_begin = Partition::StringToPartitionKey(
                p.partition_key_start(), p.hash_buckets_size());
            const auto p_end = Partition::StringToPartitionKey(
                p.partition_key_end(), p.hash_buckets_size());
            found_new = p_begin == lower_bound && p_end == upper_bound;
          }

          DCHECK(!found_existing || !found_new);
          if (found_existing) {
            tablets_to_drop->emplace_back(existing_iter->second);
            existing_tablets.erase(existing_iter);
          } else if (found_new) {
            new_iter->second->mutable_metadata()->AbortMutation();
            new_tablets.erase(new_iter);
          } else {
            return Status::InvalidArgument("no range partition to drop",
                partition_schema.RangePartitionDebugString(*ops[0].split_row,
                                                           *ops[1].split_row));
          }
        }
        break;
      }
      default: {
        return Status::InvalidArgument("unknown alter table range partitioning step",
                                       SecureShortDebugString(step));
      }
    }
  }

  for (auto& tablet : new_tablets) {
    tablets_to_add->emplace_back(std::move(tablet.second));
  }
  abort_mutations.cancel();
  *partition_schema_updated = partition_schema_updates > 0;
  return Status::OK();
}