Status TableScanner::StartWork()

in src/kudu/tools/table_scanner.cc [899:1033]


Status TableScanner::StartWork(WorkType work_type) {
  client::sp::shared_ptr<KuduTable> src_table;
  RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));

  // Create destination table if needed.
  if (work_type == WorkType::kCopy) {
    RETURN_NOT_OK(CreateDstTableIfNeeded(src_table, *dst_client_, *dst_table_name_));
    if (FLAGS_write_type.empty()) {
      // Create table only.
      return Status::OK();
    }
  }

  KuduScanTokenBuilder builder(src_table.get());
  RETURN_NOT_OK(builder.SetCacheBlocks(FLAGS_fill_cache));
  if (mode_) {
    RETURN_NOT_OK(builder.SetReadMode(*mode_));
  }
  if (scan_batch_size_ >= 0) {
    // Batch size of 0 is valid and has special semantics: the server sends
    // zero rows (i.e. no data) in the very first scan batch sent back to the
    // client. See {KuduScanner,KuduScanTokenBuilder}::SetBatchSizeBytes().
    RETURN_NOT_OK(builder.SetBatchSizeBytes(scan_batch_size_));
  }
  RETURN_NOT_OK(builder.SetSelection(replica_selection_));
  RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
  if (FLAGS_fault_tolerant) {
    // TODO(yingchun): push down this judgement to ScanConfiguration::SetFaultTolerant
    if (mode_ && *mode_ != KuduScanner::READ_AT_SNAPSHOT) {
      return Status::InvalidArgument(Substitute("--fault_tolerant conflicts with "
          "the non-READ_AT_SNAPSHOT read mode"));
    }
    RETURN_NOT_OK(builder.SetFaultTolerant());
  }

  // Set projection if needed.
  if (work_type == WorkType::kScan) {
    const auto project_all = FLAGS_columns == "*" || FLAGS_columns.empty();
    if (!project_all || FLAGS_row_count_only) {
      vector<string> projected_column_names;
      if (!FLAGS_row_count_only && !FLAGS_columns.empty()) {
        projected_column_names = Split(FLAGS_columns, ",", strings::SkipEmpty());
      }
      RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
    }
  }

  if (work_type == WorkType::kCopy) {
    // If we are copying a table we do not want to scan the auto-incrementing column as it would be
    // populated on the server side. This would avoid scanning an entire column of the table.
    if (src_table->schema().GetAutoIncrementingColumnIndex() != -1) {
      vector<string> projected_column_names;
      for (int i = 0; i < src_table->schema().num_columns(); i++) {
        if (src_table->schema().Column(i).name() == KuduSchema::GetAutoIncrementingColumnName()) {
          continue;
        }
        projected_column_names.emplace_back(src_table->schema().Column(i).name());
      }
      RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
    }
    // Ensure both the source and destination table schemas are identical at this point.
    client::sp::shared_ptr<KuduTable> dst_table;
    RETURN_NOT_OK(dst_client_->get()->OpenTable(*dst_table_name_, &dst_table));
    if (dst_table->schema() != src_table->schema()) {
      return Status::InvalidArgument("source and destination tables should have the same schema");
    }
  }

  // Set predicates.
  RETURN_NOT_OK(AddPredicates(src_table, &builder));

  vector<KuduScanToken*> tokens;
  ElementDeleter deleter(&tokens);
  RETURN_NOT_OK(builder.Build(&tokens));

  const int num_threads = FLAGS_num_threads;

  // Set tablet filter.
  const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace());
  map<int, vector<KuduScanToken*>> thread_tokens;
  int i = 0;
  for (auto* token : tokens) {
    if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) {
      thread_tokens[i++ % num_threads].emplace_back(token);
    }
  }

  RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool")
                  .set_max_threads(num_threads)
                  .set_idle_timeout(MonoDelta::FromMilliseconds(1))
                  .Build(&thread_pool_));

  // Initialize statuses for each thread.
  vector<Status> thread_statuses(num_threads);

  Stopwatch sw(Stopwatch::THIS_THREAD);
  sw.start();
  for (i = 0; i < num_threads; ++i) {
    auto* t_tokens = &thread_tokens[i];
    auto* t_status = &thread_statuses[i];
    if (work_type == WorkType::kScan) {
      RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
                                         { this->ScanTask(*t_tokens, t_status); }));
    } else {
      DCHECK(work_type == WorkType::kCopy);
      RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
                                         { this->CopyTask(*t_tokens, t_status); }));
    }
  }
  while (!thread_pool_->WaitFor(MonoDelta::FromSeconds(5))) {
    LOG(INFO) << "Scanned count: " << total_count_;
  }
  thread_pool_->Shutdown();

  sw.stop();
  if (out_) {
    *out_ << "Total count " << total_count_
        << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
  }

  const auto& operation = work_type == WorkType::kScan ? "Scanning" : "Copying";
  Status result_status;
  for (const auto& s : thread_statuses) {
    if (!s.ok()) {
      if (out_) {
        *out_ << operation << " failed: " << s.ToString() << endl;
      }
      if (result_status.ok()) {
        result_status = s;
      }
    }
  }

  return result_status;
}