in src/kudu/tools/table_scanner.cc [528:728]
Status CreateDstTableIfNeeded(const client::sp::shared_ptr<KuduTable>& src_table,
const client::sp::shared_ptr<KuduClient>& dst_client,
const string& dst_table_name) {
client::sp::shared_ptr<KuduTable> dst_table;
auto s = dst_client->OpenTable(dst_table_name, &dst_table);
if (!s.IsNotFound() && !s.ok()) {
return s;
}
// Destination table exists.
const Schema src_schema_internal = KuduSchema::ToSchema(src_table->schema());
if (s.ok()) {
if (src_table->id() == dst_table->id()) {
return Status::AlreadyPresent("Destination table is the same as the source table.");
}
RETURN_NOT_OK(SchemasMatch(src_schema_internal,
KuduSchema::ToSchema(dst_table->schema())));
return Status::OK();
}
// The destination table does NOT exist.
if (!FLAGS_create_table) {
return Status::NotFound(Substitute("Table $0 does not exist in the destination cluster.",
dst_table_name));
}
// Construct the destination table schema.
//
// 'to_delete_columns' is used to store the dummy columns that will be dropped after the table
// has been created if there are some column id holes in the source table schema.
vector<string> to_delete_columns;
static ObjectIdGenerator oid_generator;
SchemaBuilder builder;
int32_t expect_column_id = src_schema_internal.column_id(0);
for (size_t idx = 0; idx < src_schema_internal.num_columns();) {
const int32_t actual_column_id = src_schema_internal.column_id(idx);
if (expect_column_id == actual_column_id) {
// Construct the destination column schema according to the source column for continuous
// column id.
RETURN_NOT_OK(builder.AddColumn(src_schema_internal.column(idx),
src_schema_internal.is_key_column(idx)));
VLOG(1) << Substitute("Add a real column $0 for column id $1",
src_schema_internal.column(idx).ToString(),
actual_column_id);
// The expected column id is continuous.
++expect_column_id;
++idx;
} else {
// When there are column id holes, the expected column id must be less than the actual
// column id.
if (PREDICT_FALSE(expect_column_id >= actual_column_id)) {
return Status::Corruption(
Substitute("The internal column IDs must be monotonically increasing, but we got $0 "
"while expecting $1.",
actual_column_id, expect_column_id));
}
// Fill the hole with dummy columns.
while (expect_column_id < actual_column_id) {
auto dummy_column_name = "dummy_" + oid_generator.Next();
RETURN_NOT_OK(builder.AddColumn(dummy_column_name, DataType::INT8));
VLOG(1) << Substitute("Add a dummy column $0 for column id $1",
dummy_column_name, expect_column_id);
// The dummy columns will be dropped after the table is created.
to_delete_columns.emplace_back(dummy_column_name);
++expect_column_id;
}
}
}
const Schema dst_schema_internal = builder.Build();
const auto& partition_schema = src_table->partition_schema();
auto convert_column_ids_to_names = [&dst_schema_internal] (const vector<ColumnId>& column_ids) {
vector<string> column_names;
column_names.reserve(column_ids.size());
for (const auto& column_id : column_ids) {
column_names.emplace_back(dst_schema_internal.column_by_id(column_id).name());
}
return column_names;
};
// Table schema and replica number.
const int num_replicas = FLAGS_create_table_replication_factor == -1 ?
src_table->num_replicas() : FLAGS_create_table_replication_factor;
const KuduSchema dst_table_schema = KuduSchema::FromSchema(dst_schema_internal);
unique_ptr<KuduTableCreator> table_creator(dst_client->NewTableCreator());
table_creator->table_name(dst_table_name)
.schema(&dst_table_schema)
.num_replicas(num_replicas);
// Add hash partition schema.
vector<int> hash_bucket_nums;
if (!partition_schema.hash_schema().empty()) {
vector<string> hash_bucket_nums_str = Split(FLAGS_create_table_hash_bucket_nums,
",", strings::SkipEmpty());
// FLAGS_create_table_hash_bucket_nums is not defined, set it to -1 defaultly.
if (hash_bucket_nums_str.empty()) {
for (int i = 0; i < partition_schema.hash_schema().size(); i++) {
hash_bucket_nums.push_back(-1);
}
} else {
// If the --create_table_hash_bucket_nums flag is set, the number
// of comma-separated elements must be equal to the number of hash schema dimensions.
if (partition_schema.hash_schema().size() != hash_bucket_nums_str.size()) {
return Status::InvalidArgument("The count of hash bucket numbers must be equal to the "
"number of hash schema dimensions.");
}
for (int i = 0; i < hash_bucket_nums_str.size(); i++) {
int bucket_num = 0;
const bool is_number = safe_strto32(hash_bucket_nums_str[i], &bucket_num);
if (!is_number) {
return Status::InvalidArgument(Substitute("'$0': cannot parse the number "
"of hash buckets.",
hash_bucket_nums_str[i]));
}
if (bucket_num < 2) {
return Status::InvalidArgument("The number of hash buckets must not be less than 2.");
}
hash_bucket_nums.push_back(bucket_num);
}
}
}
if (partition_schema.hash_schema().empty() &&
!FLAGS_create_table_hash_bucket_nums.empty()) {
return Status::InvalidArgument("There are no hash partitions defined in this table.");
}
int i = 0;
for (const auto& hash_dimension : partition_schema.hash_schema()) {
const int num_buckets = hash_bucket_nums[i] != -1 ? hash_bucket_nums[i] :
hash_dimension.num_buckets;
const auto hash_columns = convert_column_ids_to_names(hash_dimension.column_ids);
table_creator->add_hash_partitions(hash_columns,
num_buckets,
static_cast<int32_t>(hash_dimension.seed));
i++;
}
// Add range partition schema.
if (!partition_schema.range_schema().column_ids.empty()) {
const auto range_columns
= convert_column_ids_to_names(partition_schema.range_schema().column_ids);
table_creator->set_range_partition_columns(range_columns);
}
// Add range bounds for each range partition.
vector<Partition> partitions;
RETURN_NOT_OK(src_table->ListPartitions(&partitions));
for (const auto& partition : partitions) {
// Deduplicate by hash bucket to get a unique entry per range partition.
const auto& hash_buckets = partition.hash_buckets();
if (!std::all_of(hash_buckets.begin(),
hash_buckets.end(),
[](int32_t bucket) { return bucket == 0; })) {
continue;
}
// Partitions are considered metadata, so don't redact them.
const ScopedDisableRedaction no_redaction;
Arena arena(256);
std::unique_ptr<KuduPartialRow> lower(new KuduPartialRow(&dst_schema_internal));
std::unique_ptr<KuduPartialRow> upper(new KuduPartialRow(&dst_schema_internal));
Slice range_key_start(partition.begin().range_key());
Slice range_key_end(partition.end().range_key());
RETURN_NOT_OK(partition_schema.DecodeRangeKey(&range_key_start, lower.get(), &arena));
RETURN_NOT_OK(partition_schema.DecodeRangeKey(&range_key_end, upper.get(), &arena));
table_creator->add_range_partition(lower.release(), upper.release());
}
if (partition_schema.range_schema().column_ids.empty()) {
// This src table is unpartitioned, just create a table range partitioned on no columns.
table_creator->set_range_partition_columns({});
}
table_creator->set_allow_empty_partition(true);
// Create table.
RETURN_NOT_OK(table_creator->Create());
// Drop the dummy columns.
if (!to_delete_columns.empty()) {
unique_ptr<client::KuduTableAlterer> alterer(dst_client->NewTableAlterer(dst_table_name));
for (const auto &to_delete_column: to_delete_columns) {
VLOG(1) << Substitute("Drop dummy column $0", to_delete_column);
alterer->DropColumn(to_delete_column);
}
RETURN_NOT_OK(alterer->Alter());
}
// Check that the schemas match.
RETURN_NOT_OK(dst_client->OpenTable(dst_table_name, &dst_table));
RETURN_NOT_OK(SchemasMatch(src_schema_internal,
KuduSchema::ToSchema(dst_table->schema())));
LOG(INFO) << "Table " << dst_table_name << " created successfully";
return Status::OK();
}