Status CreateDstTableIfNeeded()

in src/kudu/tools/table_scanner.cc [528:728]


Status CreateDstTableIfNeeded(const client::sp::shared_ptr<KuduTable>& src_table,
                              const client::sp::shared_ptr<KuduClient>& dst_client,
                              const string& dst_table_name) {
  client::sp::shared_ptr<KuduTable> dst_table;
  auto s = dst_client->OpenTable(dst_table_name, &dst_table);
  if (!s.IsNotFound() && !s.ok()) {
    return s;
  }

  // Destination table exists.
  const Schema src_schema_internal = KuduSchema::ToSchema(src_table->schema());
  if (s.ok()) {
    if (src_table->id() == dst_table->id()) {
      return Status::AlreadyPresent("Destination table is the same as the source table.");
    }
    RETURN_NOT_OK(SchemasMatch(src_schema_internal,
                               KuduSchema::ToSchema(dst_table->schema())));
    return Status::OK();
  }

  // The destination table does NOT exist.
  if (!FLAGS_create_table) {
    return Status::NotFound(Substitute("Table $0 does not exist in the destination cluster.",
                                       dst_table_name));
  }

  // Construct the destination table schema.
  //
  // 'to_delete_columns' is used to store the dummy columns that will be dropped after the table
  // has been created if there are some column id holes in the source table schema.
  vector<string> to_delete_columns;
  static ObjectIdGenerator oid_generator;
  SchemaBuilder builder;
  int32_t expect_column_id = src_schema_internal.column_id(0);
  for (size_t idx = 0; idx < src_schema_internal.num_columns();) {
    const int32_t actual_column_id = src_schema_internal.column_id(idx);
    if (expect_column_id == actual_column_id) {
      // Construct the destination column schema according to the source column for continuous
      // column id.
      RETURN_NOT_OK(builder.AddColumn(src_schema_internal.column(idx),
                                      src_schema_internal.is_key_column(idx)));
      VLOG(1) << Substitute("Add a real column $0 for column id $1",
                            src_schema_internal.column(idx).ToString(),
                            actual_column_id);
      // The expected column id is continuous.
      ++expect_column_id;
      ++idx;
    } else {
      // When there are column id holes, the expected column id must be less than the actual
      // column id.
      if (PREDICT_FALSE(expect_column_id >= actual_column_id)) {
        return Status::Corruption(
            Substitute("The internal column IDs must be monotonically increasing, but we got $0 "
                       "while expecting $1.",
                       actual_column_id, expect_column_id));
      }
      // Fill the hole with dummy columns.
      while (expect_column_id < actual_column_id) {
        auto dummy_column_name = "dummy_" + oid_generator.Next();
        RETURN_NOT_OK(builder.AddColumn(dummy_column_name, DataType::INT8));
        VLOG(1) << Substitute("Add a dummy column $0 for column id $1",
                              dummy_column_name, expect_column_id);
        // The dummy columns will be dropped after the table is created.
        to_delete_columns.emplace_back(dummy_column_name);
        ++expect_column_id;
      }
    }
  }

  const Schema dst_schema_internal = builder.Build();
  const auto& partition_schema = src_table->partition_schema();

  auto convert_column_ids_to_names = [&dst_schema_internal] (const vector<ColumnId>& column_ids) {
    vector<string> column_names;
    column_names.reserve(column_ids.size());
    for (const auto& column_id : column_ids) {
      column_names.emplace_back(dst_schema_internal.column_by_id(column_id).name());
    }
    return column_names;
  };

  // Table schema and replica number.
  const int num_replicas = FLAGS_create_table_replication_factor == -1 ?
      src_table->num_replicas() : FLAGS_create_table_replication_factor;
  const KuduSchema dst_table_schema = KuduSchema::FromSchema(dst_schema_internal);
  unique_ptr<KuduTableCreator> table_creator(dst_client->NewTableCreator());
  table_creator->table_name(dst_table_name)
      .schema(&dst_table_schema)
      .num_replicas(num_replicas);

  // Add hash partition schema.
  vector<int> hash_bucket_nums;
  if (!partition_schema.hash_schema().empty()) {
    vector<string> hash_bucket_nums_str = Split(FLAGS_create_table_hash_bucket_nums,
                                                ",", strings::SkipEmpty());
    // FLAGS_create_table_hash_bucket_nums is not defined, set it to -1 defaultly.
    if (hash_bucket_nums_str.empty()) {
      for (int i = 0; i < partition_schema.hash_schema().size(); i++) {
        hash_bucket_nums.push_back(-1);
      }
    } else {
      // If the --create_table_hash_bucket_nums flag is set, the number
      // of comma-separated elements must be equal to the number of hash schema dimensions.
      if (partition_schema.hash_schema().size() != hash_bucket_nums_str.size()) {
        return Status::InvalidArgument("The count of hash bucket numbers must be equal to the "
                                       "number of hash schema dimensions.");
      }
      for (int i = 0; i < hash_bucket_nums_str.size(); i++) {
        int bucket_num = 0;
        const bool is_number = safe_strto32(hash_bucket_nums_str[i], &bucket_num);
        if (!is_number) {
          return Status::InvalidArgument(Substitute("'$0': cannot parse the number "
                                                    "of hash buckets.",
                                                    hash_bucket_nums_str[i]));
        }
        if (bucket_num < 2) {
          return Status::InvalidArgument("The number of hash buckets must not be less than 2.");
        }
        hash_bucket_nums.push_back(bucket_num);
      }
    }
  }

  if (partition_schema.hash_schema().empty() &&
      !FLAGS_create_table_hash_bucket_nums.empty()) {
    return Status::InvalidArgument("There are no hash partitions defined in this table.");
  }

  int i = 0;
  for (const auto& hash_dimension : partition_schema.hash_schema()) {
    const int num_buckets = hash_bucket_nums[i] != -1 ? hash_bucket_nums[i] :
                                                        hash_dimension.num_buckets;
    const auto hash_columns = convert_column_ids_to_names(hash_dimension.column_ids);
    table_creator->add_hash_partitions(hash_columns,
                                       num_buckets,
                                       static_cast<int32_t>(hash_dimension.seed));
    i++;
  }

  // Add range partition schema.
  if (!partition_schema.range_schema().column_ids.empty()) {
    const auto range_columns
      = convert_column_ids_to_names(partition_schema.range_schema().column_ids);
    table_creator->set_range_partition_columns(range_columns);
  }

  // Add range bounds for each range partition.
  vector<Partition> partitions;
  RETURN_NOT_OK(src_table->ListPartitions(&partitions));
  for (const auto& partition : partitions) {
    // Deduplicate by hash bucket to get a unique entry per range partition.
    const auto& hash_buckets = partition.hash_buckets();
    if (!std::all_of(hash_buckets.begin(),
                     hash_buckets.end(),
                     [](int32_t bucket) { return bucket == 0; })) {
      continue;
    }

    // Partitions are considered metadata, so don't redact them.
    const ScopedDisableRedaction no_redaction;

    Arena arena(256);
    std::unique_ptr<KuduPartialRow> lower(new KuduPartialRow(&dst_schema_internal));
    std::unique_ptr<KuduPartialRow> upper(new KuduPartialRow(&dst_schema_internal));
    Slice range_key_start(partition.begin().range_key());
    Slice range_key_end(partition.end().range_key());
    RETURN_NOT_OK(partition_schema.DecodeRangeKey(&range_key_start, lower.get(), &arena));
    RETURN_NOT_OK(partition_schema.DecodeRangeKey(&range_key_end, upper.get(), &arena));

    table_creator->add_range_partition(lower.release(), upper.release());
  }

  if (partition_schema.range_schema().column_ids.empty()) {
    // This src table is unpartitioned, just create a table range partitioned on no columns.
    table_creator->set_range_partition_columns({});
  }

  table_creator->set_allow_empty_partition(true);

  // Create table.
  RETURN_NOT_OK(table_creator->Create());

  // Drop the dummy columns.
  if (!to_delete_columns.empty()) {
    unique_ptr<client::KuduTableAlterer> alterer(dst_client->NewTableAlterer(dst_table_name));
    for (const auto &to_delete_column: to_delete_columns) {
      VLOG(1) << Substitute("Drop dummy column $0", to_delete_column);
      alterer->DropColumn(to_delete_column);
    }
    RETURN_NOT_OK(alterer->Alter());
  }

  // Check that the schemas match.
  RETURN_NOT_OK(dst_client->OpenTable(dst_table_name, &dst_table));
  RETURN_NOT_OK(SchemasMatch(src_schema_internal,
                             KuduSchema::ToSchema(dst_table->schema())));

  LOG(INFO) << "Table " << dst_table_name << " created successfully";

  return Status::OK();
}