in src/kudu/tools/tool_action_perf.cc [631:742]
Status TestLoadGenerator(const RunnerContext& context) {
const string& master_addresses_str =
FindOrDie(context.required_args, kMasterAddressesArg);
vector<string> master_addrs(strings::Split(master_addresses_str, ","));
if (master_addrs.empty()) {
return Status::InvalidArgument(
"At least one master address must be specified");
}
shared_ptr<KuduClient> client;
RETURN_NOT_OK(KuduClientBuilder()
.master_server_addrs(master_addrs)
.Build(&client));
string table_name;
bool is_auto_table = false;
if (!FLAGS_table_name.empty()) {
table_name = FLAGS_table_name;
} else {
static const string kKeyColumnName = "key";
// The auto-created table case.
is_auto_table = true;
ObjectIdGenerator oid_generator;
table_name = Substitute("$0loadgen_auto_$1",
FLAGS_auto_database.empty() ? "" : FLAGS_auto_database + ".",
oid_generator.Next());
KuduSchema schema;
KuduSchemaBuilder b;
b.AddColumn(kKeyColumnName)->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
b.AddColumn("int_val")->Type(KuduColumnSchema::INT32);
b.AddColumn("string_val")->Type(KuduColumnSchema::STRING);
RETURN_NOT_OK(b.Build(&schema));
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
table_creator->table_name(table_name)
.schema(&schema);
if (FLAGS_table_num_replicas > 0) {
table_creator->num_replicas(FLAGS_table_num_replicas);
}
if (FLAGS_table_num_range_partitions > 1) {
// Split the generated span for a sequential workload evenly across all
// tablets. In case we're inserting in random mode, use unbounded range
// partitioning, so the table has key coverage of the entire keyspace.
const int64_t total_inserted_span = SpanPerThread(schema.num_columns()) * FLAGS_num_threads;
const int64_t span_per_range = total_inserted_span / FLAGS_table_num_range_partitions;
table_creator->set_range_partition_columns({ kKeyColumnName });
for (int i = 1; i < FLAGS_table_num_range_partitions; i++) {
unique_ptr<KuduPartialRow> split(schema.NewRow());
int64_t split_val = FLAGS_seq_start + i * span_per_range;
RETURN_NOT_OK(split->SetInt64(kKeyColumnName, split_val));
table_creator->add_range_partition_split(split.release());
}
}
if (FLAGS_table_num_hash_partitions > 1) {
table_creator->add_hash_partitions(
vector<string>({ kKeyColumnName }), FLAGS_table_num_hash_partitions);
}
RETURN_NOT_OK(table_creator->Create());
}
cout << "Using " << (is_auto_table ? "auto-created " : "")
<< "table '" << table_name << "'" << endl;
uint64_t total_row_count = 0;
uint64_t total_err_count = 0;
Stopwatch sw(Stopwatch::ALL_THREADS);
sw.start();
Status status = GenerateInsertRows(client, table_name,
&total_row_count, &total_err_count);
sw.stop();
const double total = sw.elapsed().wall_millis();
cout << endl << "Generator report" << endl
<< " time total : " << total << " ms" << endl;
if (total_row_count != 0) {
cout << " time per row: " << total / total_row_count << " ms" << endl;
}
if (!status.ok() || total_err_count != 0) {
string err_str;
if (!status.ok()) {
SubstituteAndAppend(&err_str, status.ToString());
}
if (total_err_count != 0) {
if (!status.ok()) {
SubstituteAndAppend(&err_str, "; ");
}
SubstituteAndAppend(&err_str, "Encountered $0 write operation errors",
total_err_count);
}
return Status::RuntimeError(err_str);
}
if (FLAGS_run_scan) {
// Run a table scan to count inserted rows.
uint64_t count;
RETURN_NOT_OK(CountTableRows(client, table_name, &count));
cout << endl << "Scanner report" << endl
<< " expected rows: " << total_row_count << endl
<< " actual rows : " << count << endl;
if (count != total_row_count) {
return Status::RuntimeError(
Substitute("Row count mismatch: expected $0, actual $1",
total_row_count, count));
}
}
if (is_auto_table && !FLAGS_keep_auto_table) {
cout << "Dropping auto-created table '" << table_name << "'" << endl;
// Drop the table which was automatically created to run the test.
RETURN_NOT_OK(client->DeleteTable(table_name));
}
return Status::OK();
}