Status TabletServiceImpl::HandleNewScanRequest()

in src/kudu/tserver/tablet_service.cc [2759:3044]


Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
                                               const ScanRequestPB* req,
                                               const RpcContext* rpc_context,
                                               ScanResultCollector* result_collector,
                                               string* scanner_id,
                                               Timestamp* snap_timestamp,
                                               bool* has_more_results,
                                               TabletServerErrorPB::Code* error_code) {
  DCHECK(result_collector != nullptr);
  DCHECK(error_code != nullptr);
  DCHECK(req->has_new_scan_request());
  const NewScanRequestPB& scan_pb = req->new_scan_request();
  TRACE_EVENT2("tserver", "TabletServiceImpl::HandleNewScanRequest",
               "tablet_id", scan_pb.tablet_id(),
               "query_id", req->query_id());
  SharedScanner scanner;
  server_->scanner_manager()->NewScanner(replica,
                                         rpc_context->remote_user(),
                                         scan_pb.row_format_flags(),
                                         &scanner);
  TRACE("Created scanner $0 for tablet $1, query id is $2",
        scanner->id(), scanner->tablet_id(), req->query_id());
  auto scanner_lock = scanner->LockForAccess();

  // If we early-exit out of this function, automatically unregister
  // the scanner.
  ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
  ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times());

  // Create the user's requested projection.
  // TODO(todd): Add test cases for bad projections including 0 columns.
  Schema projection;
  Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
  if (PREDICT_FALSE(!s.ok())) {
    *error_code = TabletServerErrorPB::INVALID_SCHEMA;
    return s;
  }

  if (projection.has_column_ids()) {
    *error_code = TabletServerErrorPB::INVALID_SCHEMA;
    return Status::InvalidArgument("User requests should not have Column IDs");
  }

  if (scan_pb.order_mode() == UNKNOWN_ORDER_MODE) {
    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
    return Status::InvalidArgument("Unknown order mode specified");
  }

  if (scan_pb.order_mode() == ORDERED) {
    // Ordered scans must be at a snapshot so that we perform a serializable read (which can be
    // resumed). Otherwise, this would be read committed isolation, which is not resumable.
    if (scan_pb.read_mode() != READ_AT_SNAPSHOT) {
      *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
          return Status::InvalidArgument("Cannot do an ordered scan that is not a snapshot read");
    }
  }

  const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
  const Schema& tablet_schema = *tablet_schema_ptr;

  ScanSpec spec;
  s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec);
  if (PREDICT_FALSE(!s.ok())) {
    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
    return s;
  }

  VLOG(3) << "Before optimizing scan spec: " << spec.ToString(tablet_schema);
  spec.PruneInlistValuesIfPossible(tablet_schema,
                                   replica->tablet_metadata()->partition(),
                                   replica->tablet_metadata()->partition_schema());
  spec.OptimizeScan(tablet_schema, scanner->arena(), true);
  VLOG(3) << "After optimizing scan spec: " << spec.ToString(tablet_schema);

  // Missing columns will contain the columns that are not mentioned in the
  // client projection but are actually needed for the scan, such as columns
  // referred to by predicates.
  //
  // NOTE: We should build the missing column after optimizing scan which will
  // remove unnecessary predicates.
  vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);

  // Build a new projection with the projection columns and the missing columns,
  // annotating each column as a key column appropriately.
  //
  // Note: the projection is a consistent schema (i.e. no duplicate columns).
  // However, it has some different semantics as compared to the tablet schema:
  // - It might not contain all of the columns in the tablet.
  // - It might contain extra columns not found in the tablet schema. Virtual
  //   columns are permitted, while others will cause the scan to fail later,
  //   when the tablet validates the projection.
  // - It doesn't know which of its columns are key columns. That's fine for
  //   an UNORDERED scan, but we'll need to fix this for an ORDERED scan, which
  //   requires all key columns in tablet schema order.
  SchemaBuilder projection_builder;
  if (scan_pb.order_mode() == ORDERED) {
    for (int i = 0; i < tablet_schema.num_key_columns(); i++) {
      const auto& col = tablet_schema.column(i);
      // CHECK_OK is safe because the tablet schema has no duplicate columns.
      CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ true));
    }
    for (int i = 0; i < projection.num_columns(); i++) {
      const auto& col = projection.column(i);
      // Any key columns in the projection will be ignored.
      ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
    }
    for (const ColumnSchema& col : missing_cols) {
      // Any key columns in 'missing_cols' will be ignored.
      ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
    }
  } else {
    projection_builder.Reset(projection);
    for (const ColumnSchema& col : missing_cols) {
      // CHECK_OK is safe because the builder's columns (from the projection)
      // and the missing columns are disjoint sets.
      //
      // UNORDERED scans don't need to know which columns are part of the key.
      CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ false));
    }
  }

  // Store the client's specified projection, prior to adding any missing
  // columns for predicates, etc.
  unique_ptr<Schema> client_projection(new Schema(std::move(projection)));
  projection = projection_builder.BuildWithoutIds();
  VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO);

  s = result_collector->InitSerializer(scan_pb.row_format_flags(),
                                       projection,
                                       *client_projection);
  if (!s.ok()) {
    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
    return s;
  }

  if (spec.CanShortCircuit()) {
    VLOG(1) << "short-circuiting without creating a server-side scanner.";
    *has_more_results = false;
    return Status::OK();
  }

  // It's important to keep the reference to the tablet for the case when the
  // tablet replica's shutdown is run concurrently with the code below.
  shared_ptr<Tablet> tablet;
  RETURN_NOT_OK(GetTabletRef(replica, &tablet, error_code));

  // Ensure the tablet has a valid clean time.
  s = tablet->mvcc_manager()->CheckIsCleanTimeInitialized();
  if (!s.ok()) {
    LOG(WARNING) << Substitute("Rejecting scan request for tablet $0: $1",
                               tablet->tablet_id(), s.ToString());
    // Return TABLET_NOT_RUNNING so the scan can be handled appropriately (fail
    // over to another tablet server if fault-tolerant).
    *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
    return s;
  }

  unique_ptr<RowwiseIterator> iter;
  optional<Timestamp> snap_start_timestamp;

  {
    TRACE("Creating iterator");
    TRACE_EVENT0("tserver", "Create iterator");

    switch (scan_pb.read_mode()) {
      case UNKNOWN_READ_MODE: {
        *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
        return Status::NotSupported("Unknown read mode.");
      }
      case READ_LATEST: {
        if (scan_pb.has_snap_start_timestamp()) {
          *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
          return Status::InvalidArgument("scan start timestamp is only supported "
                                         "in READ_AT_SNAPSHOT read mode");
        }
        s = tablet->NewRowIterator(projection, &iter);
        break;
      }
      case READ_YOUR_WRITES: // Fallthrough intended
      case READ_AT_SNAPSHOT: {
        s = HandleScanAtSnapshot(
            scan_pb, rpc_context, projection, tablet.get(), replica->time_manager(),
            &iter, &snap_start_timestamp, snap_timestamp, error_code);
        break;
      }
    }
    TRACE("Iterator created");
  }

  // Make a copy of the optimized spec before it's passed to the iterator.
  // This copy will be given to the Scanner so it can report its predicates to
  // /scans. The copy is necessary because the original spec will be modified
  // as its predicates are pushed into lower-level iterators.
  unique_ptr<ScanSpec> orig_spec(new ScanSpec(spec));

  if (PREDICT_TRUE(s.ok())) {
    TRACE_EVENT0("tserver", "iter->Init");
    s = iter->Init(&spec);
    if (PREDICT_FALSE(s.IsInvalidArgument())) {
      // Tablet::Iterator::Init() returns InvalidArgument when an invalid
      // projection is specified.
      // TODO(todd): would be nice if we threaded these more specific
      // error codes throughout Kudu.
      *error_code = TabletServerErrorPB::MISMATCHED_SCHEMA;
      return s;
    }
  }

  TRACE("Iterator init: $0", s.ToString());

  if (PREDICT_FALSE(!s.ok())) {
    LOG(WARNING) << Substitute(
        "error setting up scanner $0 with request: $1: $2",
        scanner->id(), s.ToString(), SecureShortDebugString(*req));
    // If the replica has been stopped, e.g. due to disk failure, return
    // TABLET_FAILED so the scan can be handled appropriately (fail over to
    // another tablet server if fault-tolerant).
    if (tablet->HasBeenStopped()) {
      *error_code = TabletServerErrorPB::TABLET_FAILED;
    }
    return s;
  }

  // If this is a snapshot scan and the user specified a specific timestamp to
  // scan at, then check that we are not attempting to scan at a time earlier
  // than the ancient history mark. Only perform this check if tablet history
  // GC is enabled.
  //
  // TODO: This validation essentially prohibits scans with READ_AT_SNAPSHOT
  // when history_max_age is set to zero. There is a tablet history GC related
  // race when the history max age is set to very low, or zero. Imagine a case
  // where a scan was started and READ_AT_SNAPSHOT was specified without
  // specifying a snapshot timestamp, and --tablet_history_max_age_sec=0. The
  // above code path will select the latest timestamp (under a lock) prior to
  // calling RowIterator::Init(), which actually opens the blocks. That means
  // that there is an opportunity in between those two calls for tablet history
  // GC to kick in and delete some history. In fact, we may easily not actually
  // end up with a valid snapshot in that case. It would be more correct to
  // initialize the row iterator and then select the latest timestamp
  // represented by those open files in that case.
  //
  // Now that we have initialized our row iterator at a snapshot, return an
  // error if the snapshot timestamp was prior to the ancient history mark.
  // We have to check after we open the iterator in order to avoid a TOCTOU
  // error since it's possible that initializing the row iterator could race
  // against the tablet history GC maintenance task.
  RETURN_NOT_OK_EVAL(VerifyLegalSnapshotTimestamps(tablet.get(), scan_pb.read_mode(),
                                                   snap_start_timestamp, *snap_timestamp),
                     *error_code = TabletServerErrorPB::INVALID_SNAPSHOT);

  *has_more_results = iter->HasNext() && !scanner->has_fulfilled_limit();
  TRACE("has_more: $0", *has_more_results);
  if (!*has_more_results) {
    // If there are no more rows, we can short circuit some work and respond immediately.
    VLOG(1) << "No more rows, short-circuiting out without creating a server-side scanner.";
    return Status::OK();
  }

  scanner->Init(std::move(iter), std::move(orig_spec), std::move(client_projection));

  // Stop the scanner timer because ContinueScanRequest starts its own timer.
  scanner_timer.Stop();
  unreg_scanner.Cancel();
  *scanner_id = scanner->id();

  VLOG(1) << "Started scanner " << scanner->id() << ": " << scanner->iter()->ToString();

  if (GetMaxBatchSizeBytesHint(req) > 0) {
    TRACE("Continuing scan request");
    // TODO(wdberkeley): Instead of copying the pb, instead split
    // HandleContinueScanRequest and call the second half directly. Once that's
    // done, remove the call to ScopedAddScannerTiming::Stop() above (and the
    // method as it won't be used) and start the timing for continue requests
    // from the first half that is no longer executed in this codepath.
    ScanRequestPB continue_req(*req);
    continue_req.set_scanner_id(scanner->id());
    scanner_lock.Unlock();
    return HandleContinueScanRequest(
        &continue_req, rpc_context, result_collector, has_more_results, error_code);
  }

  // Increment the scanner call sequence ID. HandleContinueScanRequest handles
  // this in the non-empty scan case.
  scanner->IncrementCallSeqId();
  return Status::OK();
}