Status KuduScanner::Data::OpenTablet()

in src/kudu/client/scanner-internal.cc [455:671]


Status KuduScanner::Data::OpenTablet(const PartitionKey& partition_key,
                                     const MonoTime& deadline,
                                     set<string>* blacklist) {
  SCOPED_LOG_SLOW_EXECUTION(WARNING, 500, "opening tablet");

  PrepareRequest(KuduScanner::Data::NEW);
  next_req_.clear_scanner_id();
  NewScanRequestPB* scan = next_req_.mutable_new_scan_request();
  scan->set_row_format_flags(configuration_.row_format_flags());
  const KuduScanner::ReadMode read_mode = configuration_.read_mode();
  switch (read_mode) {
    case KuduScanner::READ_LATEST:
      scan->set_read_mode(kudu::READ_LATEST);
      if (configuration_.has_snapshot_timestamp()) {
        LOG(FATAL) << "Snapshot timestamp should only be configured "
                      "for READ_AT_SNAPSHOT scan mode.";
      }
      break;
    case KuduScanner::READ_AT_SNAPSHOT:
      scan->set_read_mode(kudu::READ_AT_SNAPSHOT);
      if (configuration_.has_start_timestamp()) {
        scan->set_snap_start_timestamp(configuration_.start_timestamp());
      }
      if (configuration_.has_snapshot_timestamp()) {
        scan->set_snap_timestamp(configuration_.snapshot_timestamp());
      }
      break;
    case KuduScanner::READ_YOUR_WRITES:
      scan->set_read_mode(kudu::READ_YOUR_WRITES);
      if (configuration_.has_snapshot_timestamp()) {
        LOG(FATAL) << "Snapshot timestamp should only be configured "
                      "for READ_AT_SNAPSHOT scan mode.";
      }
      break;
    default:
      LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode);
  }

  if (configuration_.is_fault_tolerant()) {
    scan->set_order_mode(kudu::ORDERED);
  } else {
    scan->set_order_mode(kudu::UNORDERED);
  }

  if (last_primary_key_.length() > 0) {
    VLOG(2) << "Setting NewScanRequestPB last_primary_key to hex value "
        << HexDump(last_primary_key_);
    scan->set_last_primary_key(last_primary_key_);
  }

  if (configuration_.spec().has_limit()) {
    // Set the limit based on the number of rows we've already returned.
    int64_t new_limit = std::max(configuration_.spec().limit() - num_rows_returned_,
                                 static_cast<int64_t>(0));
    VLOG(2) << "Setting NewScanRequestPB limit " << new_limit;
    scan->set_limit(new_limit);
  }

  scan->set_cache_blocks(configuration_.spec().cache_blocks());

  // For consistent operations, propagate the timestamp among all operations
  // performed the context of the same client. For READ_YOUR_WRITES scan, use
  // the propagation timestamp from the scan config.
  uint64_t ts = KuduClient::kNoTimestamp;
  if (read_mode == KuduScanner::READ_YOUR_WRITES) {
    if (configuration_.has_lower_bound_propagation_timestamp()) {
      ts = configuration_.lower_bound_propagation_timestamp();
    }
  } else {
    ts = table_->client()->data_->GetLatestObservedTimestamp();
  }
  if (ts != KuduClient::kNoTimestamp) {
    scan->set_propagated_timestamp(ts);
  }

  // Set up the predicates.
  scan->clear_column_predicates();
  for (const auto& col_pred : configuration_.spec().predicates()) {
    ColumnPredicateToPB(col_pred.second, scan->add_column_predicates());
  }

  if (configuration_.spec().lower_bound_key()) {
    scan->mutable_start_primary_key()->assign(
      reinterpret_cast<const char*>(configuration_.spec().lower_bound_key()->encoded_key().data()),
      configuration_.spec().lower_bound_key()->encoded_key().size());
  } else {
    scan->clear_start_primary_key();
  }
  if (configuration_.spec().exclusive_upper_bound_key()) {
    scan->mutable_stop_primary_key()->assign(reinterpret_cast<const char*>(
          configuration_.spec().exclusive_upper_bound_key()->encoded_key().data()),
      configuration_.spec().exclusive_upper_bound_key()->encoded_key().size());
  } else {
    scan->clear_stop_primary_key();
  }
  RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), scan->mutable_projected_columns(),
                                  SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));

  for (int attempt = 1;; attempt++) {
    Synchronizer sync;
    table_->client()->data_->meta_cache_->LookupTabletByKey(
        table_.get(),
        partition_key,
        deadline,
        internal::MetaCache::LookupType::kLowerBound,
        &remote_,
        sync.AsStatusCallback());
    Status s = sync.Wait();
    if (s.IsNotFound()) {
      // No more tablets in the table.
      partition_pruner_.RemovePartitionKeyRange({});
      DCHECK(!partition_pruner_.HasMorePartitionKeyRanges());
      return Status::OK();
    }
    RETURN_NOT_OK(s);

    // Check if the meta cache returned a tablet covering a partition key range past
    // what we asked for. This can happen if the requested partition key falls
    // in a non-covered range. In this case we can potentially prune the tablet.
    if (partition_key < remote_->partition().begin() &&
        partition_pruner_.ShouldPrune(remote_->partition())) {
      partition_pruner_.RemovePartitionKeyRange(remote_->partition().end());
      return Status::OK();
    }

    scan->set_tablet_id(remote_->tablet_id());

    RemoteTabletServer *ts;
    vector<RemoteTabletServer*> candidates;
    Status lookup_status = table_->client()->data_->GetTabletServer(
        table_->client(),
        remote_,
        configuration_.selection(),
        *blacklist,
        &candidates,
        &ts);
    // If we get ServiceUnavailable, this indicates that the tablet doesn't
    // currently have any known leader. We should sleep and retry, since
    // it's likely that the tablet is undergoing a leader election and will
    // soon have one.
    if (lookup_status.IsServiceUnavailable() && MonoTime::Now() < deadline) {
      // ServiceUnavailable means that we have already blacklisted all the candidate
      // tablet servers. So, we clear the list so that we will cycle through them all
      // another time.
      blacklist->clear();
      int sleep_ms = attempt * 100;
      // TODO: should ensure that sleep_ms does not pass the provided deadline.
      VLOG(1) << "Tablet " << remote_->tablet_id() << " currently unavailable: "
              << lookup_status.ToString() << ". Sleeping for " << sleep_ms << "ms "
              << "and retrying...";
      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
      continue;
    }
    RETURN_NOT_OK(lookup_status);
    CHECK(ts->proxy());
    ts_ = CHECK_NOTNULL(ts);
    proxy_ = ts_->proxy();

    bool allow_time_for_failover = candidates.size() > blacklist->size() + 1;
    ScanRpcStatus scan_status = SendScanRpc(deadline, allow_time_for_failover);
    if (scan_status.result == ScanRpcStatus::OK) {
      last_error_ = Status::OK();
      scan_attempts_ = 0;
      break;
    }
    scan_attempts_++;
    RETURN_NOT_OK(HandleError(scan_status, deadline, blacklist, /* needs_reopen=*/ nullptr));
  }

  partition_pruner_.RemovePartitionKeyRange(remote_->partition().end());

  next_req_.clear_new_scan_request();
  data_in_open_ = (last_response_.has_data() && last_response_.data().num_rows() > 0) ||
      (last_response_.has_columnar_data() && last_response_.columnar_data().num_rows() > 0);
  if (last_response_.has_more_results()) {
    next_req_.set_scanner_id(last_response_.scanner_id());
    VLOG(2) << "Opened tablet " << remote_->tablet_id()
            << ", scanner ID " << last_response_.scanner_id();
  } else if (last_response_.has_data() || last_response_.has_columnar_data()) {
    VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned, "
            << " data_in_open=" << data_in_open_;
  } else {
    VLOG(2) << "Opened tablet " << remote_->tablet_id() << " (no rows), no scanner ID assigned";
  }

  // If present in the response, set the snapshot timestamp and the encoded last
  // primary key.  This is used when retrying the scan elsewhere.  The last
  // primary key is also updated on each scan response.
  if (configuration().is_fault_tolerant()) {
    if (last_response_.has_last_primary_key()) {
      last_primary_key_ = last_response_.last_primary_key();
    }
  }

  if (configuration_.read_mode() == KuduScanner::READ_AT_SNAPSHOT &&
      !configuration_.has_snapshot_timestamp()) {
    // There must be a snapshot timestamp returned by the tablet server:
    // it's the first response from the tablet server when scanning in the
    // READ_AT_SNAPSHOT mode with unspecified snapshot timestamp.
    CHECK(last_response_.has_snap_timestamp());
    configuration_.SetSnapshotRaw(last_response_.snap_timestamp());
  }

  // For READ_YOUR_WRITES mode, updates the latest observed timestamp with
  // the chosen snapshot timestamp sent back from the server, to avoid
  // unnecessarily wait for subsequent reads.
  if (configuration_.read_mode() == KuduScanner::READ_YOUR_WRITES) {
    CHECK(last_response_.has_snap_timestamp());
    table_->client()->data_->UpdateLatestObservedTimestamp(
        last_response_.snap_timestamp());
  } else if (last_response_.has_propagated_timestamp()) {
    table_->client()->data_->UpdateLatestObservedTimestamp(
        last_response_.propagated_timestamp());
  }

  return Status::OK();
}