Status TestLoadGenerator()

in src/kudu/tools/tool_action_perf.cc [631:742]


Status TestLoadGenerator(const RunnerContext& context) {
  const string& master_addresses_str =
      FindOrDie(context.required_args, kMasterAddressesArg);

  vector<string> master_addrs(strings::Split(master_addresses_str, ","));
  if (master_addrs.empty()) {
    return Status::InvalidArgument(
        "At least one master address must be specified");
  }
  shared_ptr<KuduClient> client;
  RETURN_NOT_OK(KuduClientBuilder()
                .master_server_addrs(master_addrs)
                .Build(&client));
  string table_name;
  bool is_auto_table = false;
  if (!FLAGS_table_name.empty()) {
    table_name = FLAGS_table_name;
  } else {
    static const string kKeyColumnName = "key";

    // The auto-created table case.
    is_auto_table = true;
    ObjectIdGenerator oid_generator;
    table_name = Substitute("$0loadgen_auto_$1",
        FLAGS_auto_database.empty() ? "" : FLAGS_auto_database + ".",
        oid_generator.Next());
    KuduSchema schema;
    KuduSchemaBuilder b;
    b.AddColumn(kKeyColumnName)->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
    b.AddColumn("int_val")->Type(KuduColumnSchema::INT32);
    b.AddColumn("string_val")->Type(KuduColumnSchema::STRING);
    RETURN_NOT_OK(b.Build(&schema));

    unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
    table_creator->table_name(table_name)
                  .schema(&schema);
    if (FLAGS_table_num_replicas > 0) {
      table_creator->num_replicas(FLAGS_table_num_replicas);
    }
    if (FLAGS_table_num_range_partitions > 1) {
      // Split the generated span for a sequential workload evenly across all
      // tablets. In case we're inserting in random mode, use unbounded range
      // partitioning, so the table has key coverage of the entire keyspace.
      const int64_t total_inserted_span = SpanPerThread(schema.num_columns()) * FLAGS_num_threads;
      const int64_t span_per_range = total_inserted_span / FLAGS_table_num_range_partitions;
      table_creator->set_range_partition_columns({ kKeyColumnName });
      for (int i = 1; i < FLAGS_table_num_range_partitions; i++) {
        unique_ptr<KuduPartialRow> split(schema.NewRow());
        int64_t split_val = FLAGS_seq_start + i * span_per_range;
        RETURN_NOT_OK(split->SetInt64(kKeyColumnName, split_val));
        table_creator->add_range_partition_split(split.release());
      }
    }
    if (FLAGS_table_num_hash_partitions > 1) {
      table_creator->add_hash_partitions(
          vector<string>({ kKeyColumnName }), FLAGS_table_num_hash_partitions);
    }
    RETURN_NOT_OK(table_creator->Create());
  }
  cout << "Using " << (is_auto_table ? "auto-created " : "")
       << "table '" << table_name << "'" << endl;

  uint64_t total_row_count = 0;
  uint64_t total_err_count = 0;
  Stopwatch sw(Stopwatch::ALL_THREADS);
  sw.start();
  Status status = GenerateInsertRows(client, table_name,
                                     &total_row_count, &total_err_count);
  sw.stop();
  const double total = sw.elapsed().wall_millis();
  cout << endl << "Generator report" << endl
       << "  time total  : " << total << " ms" << endl;
  if (total_row_count != 0) {
    cout << "  time per row: " << total / total_row_count << " ms" << endl;
  }
  if (!status.ok() || total_err_count != 0) {
    string err_str;
    if (!status.ok()) {
      SubstituteAndAppend(&err_str, status.ToString());
    }
    if (total_err_count != 0) {
      if (!status.ok()) {
        SubstituteAndAppend(&err_str,  "; ");
      }
      SubstituteAndAppend(&err_str, "Encountered $0 write operation errors",
                          total_err_count);
    }
    return Status::RuntimeError(err_str);
  }

  if (FLAGS_run_scan) {
    // Run a table scan to count inserted rows.
    uint64_t count;
    RETURN_NOT_OK(CountTableRows(client, table_name, &count));
    cout << endl << "Scanner report" << endl
         << "  expected rows: " << total_row_count << endl
         << "  actual rows  : " << count << endl;
    if (count != total_row_count) {
      return Status::RuntimeError(
            Substitute("Row count mismatch: expected $0, actual $1",
                       total_row_count, count));
    }
  }

  if (is_auto_table && !FLAGS_keep_auto_table) {
    cout << "Dropping auto-created table '" << table_name << "'" << endl;
    // Drop the table which was automatically created to run the test.
    RETURN_NOT_OK(client->DeleteTable(table_name));
  }

  return Status::OK();
}