in src/kudu/tools/table_scanner.cc [899:1033]
Status TableScanner::StartWork(WorkType work_type) {
client::sp::shared_ptr<KuduTable> src_table;
RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));
// Create destination table if needed.
if (work_type == WorkType::kCopy) {
RETURN_NOT_OK(CreateDstTableIfNeeded(src_table, *dst_client_, *dst_table_name_));
if (FLAGS_write_type.empty()) {
// Create table only.
return Status::OK();
}
}
KuduScanTokenBuilder builder(src_table.get());
RETURN_NOT_OK(builder.SetCacheBlocks(FLAGS_fill_cache));
if (mode_) {
RETURN_NOT_OK(builder.SetReadMode(*mode_));
}
if (scan_batch_size_ >= 0) {
// Batch size of 0 is valid and has special semantics: the server sends
// zero rows (i.e. no data) in the very first scan batch sent back to the
// client. See {KuduScanner,KuduScanTokenBuilder}::SetBatchSizeBytes().
RETURN_NOT_OK(builder.SetBatchSizeBytes(scan_batch_size_));
}
RETURN_NOT_OK(builder.SetSelection(replica_selection_));
RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
if (FLAGS_fault_tolerant) {
// TODO(yingchun): push down this judgement to ScanConfiguration::SetFaultTolerant
if (mode_ && *mode_ != KuduScanner::READ_AT_SNAPSHOT) {
return Status::InvalidArgument(Substitute("--fault_tolerant conflicts with "
"the non-READ_AT_SNAPSHOT read mode"));
}
RETURN_NOT_OK(builder.SetFaultTolerant());
}
// Set projection if needed.
if (work_type == WorkType::kScan) {
const auto project_all = FLAGS_columns == "*" || FLAGS_columns.empty();
if (!project_all || FLAGS_row_count_only) {
vector<string> projected_column_names;
if (!FLAGS_row_count_only && !FLAGS_columns.empty()) {
projected_column_names = Split(FLAGS_columns, ",", strings::SkipEmpty());
}
RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
}
}
if (work_type == WorkType::kCopy) {
// If we are copying a table we do not want to scan the auto-incrementing column as it would be
// populated on the server side. This would avoid scanning an entire column of the table.
if (src_table->schema().GetAutoIncrementingColumnIndex() != -1) {
vector<string> projected_column_names;
for (int i = 0; i < src_table->schema().num_columns(); i++) {
if (src_table->schema().Column(i).name() == KuduSchema::GetAutoIncrementingColumnName()) {
continue;
}
projected_column_names.emplace_back(src_table->schema().Column(i).name());
}
RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
}
// Ensure both the source and destination table schemas are identical at this point.
client::sp::shared_ptr<KuduTable> dst_table;
RETURN_NOT_OK(dst_client_->get()->OpenTable(*dst_table_name_, &dst_table));
if (dst_table->schema() != src_table->schema()) {
return Status::InvalidArgument("source and destination tables should have the same schema");
}
}
// Set predicates.
RETURN_NOT_OK(AddPredicates(src_table, &builder));
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
RETURN_NOT_OK(builder.Build(&tokens));
const int num_threads = FLAGS_num_threads;
// Set tablet filter.
const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace());
map<int, vector<KuduScanToken*>> thread_tokens;
int i = 0;
for (auto* token : tokens) {
if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) {
thread_tokens[i++ % num_threads].emplace_back(token);
}
}
RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool")
.set_max_threads(num_threads)
.set_idle_timeout(MonoDelta::FromMilliseconds(1))
.Build(&thread_pool_));
// Initialize statuses for each thread.
vector<Status> thread_statuses(num_threads);
Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
for (i = 0; i < num_threads; ++i) {
auto* t_tokens = &thread_tokens[i];
auto* t_status = &thread_statuses[i];
if (work_type == WorkType::kScan) {
RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
{ this->ScanTask(*t_tokens, t_status); }));
} else {
DCHECK(work_type == WorkType::kCopy);
RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
{ this->CopyTask(*t_tokens, t_status); }));
}
}
while (!thread_pool_->WaitFor(MonoDelta::FromSeconds(5))) {
LOG(INFO) << "Scanned count: " << total_count_;
}
thread_pool_->Shutdown();
sw.stop();
if (out_) {
*out_ << "Total count " << total_count_
<< " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
}
const auto& operation = work_type == WorkType::kScan ? "Scanning" : "Copying";
Status result_status;
for (const auto& s : thread_statuses) {
if (!s.ok()) {
if (out_) {
*out_ << operation << " failed: " << s.ToString() << endl;
}
if (result_status.ok()) {
result_status = s;
}
}
}
return result_status;
}