in src/kudu/tserver/tablet_service.cc [2759:3044]
Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
const ScanRequestPB* req,
const RpcContext* rpc_context,
ScanResultCollector* result_collector,
string* scanner_id,
Timestamp* snap_timestamp,
bool* has_more_results,
TabletServerErrorPB::Code* error_code) {
DCHECK(result_collector != nullptr);
DCHECK(error_code != nullptr);
DCHECK(req->has_new_scan_request());
const NewScanRequestPB& scan_pb = req->new_scan_request();
TRACE_EVENT2("tserver", "TabletServiceImpl::HandleNewScanRequest",
"tablet_id", scan_pb.tablet_id(),
"query_id", req->query_id());
SharedScanner scanner;
server_->scanner_manager()->NewScanner(replica,
rpc_context->remote_user(),
scan_pb.row_format_flags(),
&scanner);
TRACE("Created scanner $0 for tablet $1, query id is $2",
scanner->id(), scanner->tablet_id(), req->query_id());
auto scanner_lock = scanner->LockForAccess();
// If we early-exit out of this function, automatically unregister
// the scanner.
ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times());
// Create the user's requested projection.
// TODO(todd): Add test cases for bad projections including 0 columns.
Schema projection;
Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
if (PREDICT_FALSE(!s.ok())) {
*error_code = TabletServerErrorPB::INVALID_SCHEMA;
return s;
}
if (projection.has_column_ids()) {
*error_code = TabletServerErrorPB::INVALID_SCHEMA;
return Status::InvalidArgument("User requests should not have Column IDs");
}
if (scan_pb.order_mode() == UNKNOWN_ORDER_MODE) {
*error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
return Status::InvalidArgument("Unknown order mode specified");
}
if (scan_pb.order_mode() == ORDERED) {
// Ordered scans must be at a snapshot so that we perform a serializable read (which can be
// resumed). Otherwise, this would be read committed isolation, which is not resumable.
if (scan_pb.read_mode() != READ_AT_SNAPSHOT) {
*error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
return Status::InvalidArgument("Cannot do an ordered scan that is not a snapshot read");
}
}
const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
const Schema& tablet_schema = *tablet_schema_ptr;
ScanSpec spec;
s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec);
if (PREDICT_FALSE(!s.ok())) {
*error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
return s;
}
VLOG(3) << "Before optimizing scan spec: " << spec.ToString(tablet_schema);
spec.PruneInlistValuesIfPossible(tablet_schema,
replica->tablet_metadata()->partition(),
replica->tablet_metadata()->partition_schema());
spec.OptimizeScan(tablet_schema, scanner->arena(), true);
VLOG(3) << "After optimizing scan spec: " << spec.ToString(tablet_schema);
// Missing columns will contain the columns that are not mentioned in the
// client projection but are actually needed for the scan, such as columns
// referred to by predicates.
//
// NOTE: We should build the missing column after optimizing scan which will
// remove unnecessary predicates.
vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);
// Build a new projection with the projection columns and the missing columns,
// annotating each column as a key column appropriately.
//
// Note: the projection is a consistent schema (i.e. no duplicate columns).
// However, it has some different semantics as compared to the tablet schema:
// - It might not contain all of the columns in the tablet.
// - It might contain extra columns not found in the tablet schema. Virtual
// columns are permitted, while others will cause the scan to fail later,
// when the tablet validates the projection.
// - It doesn't know which of its columns are key columns. That's fine for
// an UNORDERED scan, but we'll need to fix this for an ORDERED scan, which
// requires all key columns in tablet schema order.
SchemaBuilder projection_builder;
if (scan_pb.order_mode() == ORDERED) {
for (int i = 0; i < tablet_schema.num_key_columns(); i++) {
const auto& col = tablet_schema.column(i);
// CHECK_OK is safe because the tablet schema has no duplicate columns.
CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ true));
}
for (int i = 0; i < projection.num_columns(); i++) {
const auto& col = projection.column(i);
// Any key columns in the projection will be ignored.
ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
}
for (const ColumnSchema& col : missing_cols) {
// Any key columns in 'missing_cols' will be ignored.
ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
}
} else {
projection_builder.Reset(projection);
for (const ColumnSchema& col : missing_cols) {
// CHECK_OK is safe because the builder's columns (from the projection)
// and the missing columns are disjoint sets.
//
// UNORDERED scans don't need to know which columns are part of the key.
CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ false));
}
}
// Store the client's specified projection, prior to adding any missing
// columns for predicates, etc.
unique_ptr<Schema> client_projection(new Schema(std::move(projection)));
projection = projection_builder.BuildWithoutIds();
VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO);
s = result_collector->InitSerializer(scan_pb.row_format_flags(),
projection,
*client_projection);
if (!s.ok()) {
*error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
return s;
}
if (spec.CanShortCircuit()) {
VLOG(1) << "short-circuiting without creating a server-side scanner.";
*has_more_results = false;
return Status::OK();
}
// It's important to keep the reference to the tablet for the case when the
// tablet replica's shutdown is run concurrently with the code below.
shared_ptr<Tablet> tablet;
RETURN_NOT_OK(GetTabletRef(replica, &tablet, error_code));
// Ensure the tablet has a valid clean time.
s = tablet->mvcc_manager()->CheckIsCleanTimeInitialized();
if (!s.ok()) {
LOG(WARNING) << Substitute("Rejecting scan request for tablet $0: $1",
tablet->tablet_id(), s.ToString());
// Return TABLET_NOT_RUNNING so the scan can be handled appropriately (fail
// over to another tablet server if fault-tolerant).
*error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
return s;
}
unique_ptr<RowwiseIterator> iter;
optional<Timestamp> snap_start_timestamp;
{
TRACE("Creating iterator");
TRACE_EVENT0("tserver", "Create iterator");
switch (scan_pb.read_mode()) {
case UNKNOWN_READ_MODE: {
*error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
return Status::NotSupported("Unknown read mode.");
}
case READ_LATEST: {
if (scan_pb.has_snap_start_timestamp()) {
*error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
return Status::InvalidArgument("scan start timestamp is only supported "
"in READ_AT_SNAPSHOT read mode");
}
s = tablet->NewRowIterator(projection, &iter);
break;
}
case READ_YOUR_WRITES: // Fallthrough intended
case READ_AT_SNAPSHOT: {
s = HandleScanAtSnapshot(
scan_pb, rpc_context, projection, tablet.get(), replica->time_manager(),
&iter, &snap_start_timestamp, snap_timestamp, error_code);
break;
}
}
TRACE("Iterator created");
}
// Make a copy of the optimized spec before it's passed to the iterator.
// This copy will be given to the Scanner so it can report its predicates to
// /scans. The copy is necessary because the original spec will be modified
// as its predicates are pushed into lower-level iterators.
unique_ptr<ScanSpec> orig_spec(new ScanSpec(spec));
if (PREDICT_TRUE(s.ok())) {
TRACE_EVENT0("tserver", "iter->Init");
s = iter->Init(&spec);
if (PREDICT_FALSE(s.IsInvalidArgument())) {
// Tablet::Iterator::Init() returns InvalidArgument when an invalid
// projection is specified.
// TODO(todd): would be nice if we threaded these more specific
// error codes throughout Kudu.
*error_code = TabletServerErrorPB::MISMATCHED_SCHEMA;
return s;
}
}
TRACE("Iterator init: $0", s.ToString());
if (PREDICT_FALSE(!s.ok())) {
LOG(WARNING) << Substitute(
"error setting up scanner $0 with request: $1: $2",
scanner->id(), s.ToString(), SecureShortDebugString(*req));
// If the replica has been stopped, e.g. due to disk failure, return
// TABLET_FAILED so the scan can be handled appropriately (fail over to
// another tablet server if fault-tolerant).
if (tablet->HasBeenStopped()) {
*error_code = TabletServerErrorPB::TABLET_FAILED;
}
return s;
}
// If this is a snapshot scan and the user specified a specific timestamp to
// scan at, then check that we are not attempting to scan at a time earlier
// than the ancient history mark. Only perform this check if tablet history
// GC is enabled.
//
// TODO: This validation essentially prohibits scans with READ_AT_SNAPSHOT
// when history_max_age is set to zero. There is a tablet history GC related
// race when the history max age is set to very low, or zero. Imagine a case
// where a scan was started and READ_AT_SNAPSHOT was specified without
// specifying a snapshot timestamp, and --tablet_history_max_age_sec=0. The
// above code path will select the latest timestamp (under a lock) prior to
// calling RowIterator::Init(), which actually opens the blocks. That means
// that there is an opportunity in between those two calls for tablet history
// GC to kick in and delete some history. In fact, we may easily not actually
// end up with a valid snapshot in that case. It would be more correct to
// initialize the row iterator and then select the latest timestamp
// represented by those open files in that case.
//
// Now that we have initialized our row iterator at a snapshot, return an
// error if the snapshot timestamp was prior to the ancient history mark.
// We have to check after we open the iterator in order to avoid a TOCTOU
// error since it's possible that initializing the row iterator could race
// against the tablet history GC maintenance task.
RETURN_NOT_OK_EVAL(VerifyLegalSnapshotTimestamps(tablet.get(), scan_pb.read_mode(),
snap_start_timestamp, *snap_timestamp),
*error_code = TabletServerErrorPB::INVALID_SNAPSHOT);
*has_more_results = iter->HasNext() && !scanner->has_fulfilled_limit();
TRACE("has_more: $0", *has_more_results);
if (!*has_more_results) {
// If there are no more rows, we can short circuit some work and respond immediately.
VLOG(1) << "No more rows, short-circuiting out without creating a server-side scanner.";
return Status::OK();
}
scanner->Init(std::move(iter), std::move(orig_spec), std::move(client_projection));
// Stop the scanner timer because ContinueScanRequest starts its own timer.
scanner_timer.Stop();
unreg_scanner.Cancel();
*scanner_id = scanner->id();
VLOG(1) << "Started scanner " << scanner->id() << ": " << scanner->iter()->ToString();
if (GetMaxBatchSizeBytesHint(req) > 0) {
TRACE("Continuing scan request");
// TODO(wdberkeley): Instead of copying the pb, instead split
// HandleContinueScanRequest and call the second half directly. Once that's
// done, remove the call to ScopedAddScannerTiming::Stop() above (and the
// method as it won't be used) and start the timing for continue requests
// from the first half that is no longer executed in this codepath.
ScanRequestPB continue_req(*req);
continue_req.set_scanner_id(scanner->id());
scanner_lock.Unlock();
return HandleContinueScanRequest(
&continue_req, rpc_context, result_collector, has_more_results, error_code);
}
// Increment the scanner call sequence ID. HandleContinueScanRequest handles
// this in the non-empty scan case.
scanner->IncrementCallSeqId();
return Status::OK();
}