void TabletServiceImpl::SplitKeyRange()

in src/kudu/tserver/tablet_service.cc [2289:2454]


void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
                                      SplitKeyRangeResponsePB* resp,
                                      RpcContext* context) {
  TRACE_EVENT1("tserver", "TabletServiceImpl::SplitKeyRange",
               "tablet_id", req->tablet_id());
  DVLOG(3) << "Received SplitKeyRange RPC: " << SecureDebugString(*req);

  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
                                           context, &replica)) {
    return;
  }

  if (FLAGS_tserver_enforce_access_control) {
    TokenPB token;
    if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) {
      return;
    }
    const auto& privilege = token.authz().table_privilege();
    if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(),
                                       "SplitKeyRange", context)) {
      return;
    }
    // Split-key requests require:
    //   if uses pk (e.g. primary key fields set):
    //     foreach(primary key column): SCAN ON COLUMN
    //   foreach(requested column): SCAN ON COLUMN
    //
    // If the privilege doesn't have full scan privileges, or column-level scan
    // privileges, the user is definitely not authorized to perform a scan.
    unordered_set<ColumnId> authorized_column_ids;
    if (!CheckMayHaveScanPrivilegesOrRespond(privilege, "SplitKeyRange",
                                             &authorized_column_ids, context)) {
      return;
    }
    if (!privilege.scan_privilege()) {
      const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
      const Schema& schema = *schema_ptr;
      unordered_set<ColumnId> required_column_privileges;
      if (req->has_start_primary_key() || req->has_stop_primary_key()) {
        const auto& key_cols = schema.get_key_column_ids();
        required_column_privileges.insert(key_cols.begin(), key_cols.end());
      }
      bool is_authorized = true;
      const string rejection_prefix = Substitute("rejecting SplitKeyRange request from $0",
                                                 context->requestor_string());
      for (int i = 0; i < req->columns_size(); i++) {
        const auto& column_name = req->columns(i).name();
        int col_idx = schema.find_column(req->columns(i).name());
        if (col_idx == Schema::kColumnNotFound) {
          LOG(WARNING) << Substitute("$0: no column named '$1'", rejection_prefix, column_name);
          is_authorized = false;
          break;
        }
        EmplaceIfNotPresent(&required_column_privileges, schema.column_id(col_idx));
      }
      for (const auto& required_col_id : required_column_privileges) {
        if (!ContainsKey(authorized_column_ids, required_col_id)) {
          LOG(WARNING) << Substitute("$0: authz token doesn't authorize column ID $1",
                                     rejection_prefix, required_col_id);
          is_authorized = false;
          break;
        }
      }
      if (!is_authorized) {
        context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED,
            Status::NotAuthorized("not authorized to SplitKeyRange"));
        return;
      }
    }
  }

  shared_ptr<Tablet> tablet;
  TabletServerErrorPB::Code error_code;
  Status s = GetTabletRef(replica, &tablet, &error_code);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
    return;
  }

  // Decode encoded key
  Arena arena(256);
  const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
  const Schema& tablet_schema = *tablet_schema_ptr;
  EncodedKey* start = nullptr;
  EncodedKey* stop = nullptr;
  if (req->has_start_primary_key()) {
    s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->start_primary_key(), &start);
    if (PREDICT_FALSE(!s.ok())) {
      SetupErrorAndRespond(resp->mutable_error(),
                           Status::InvalidArgument("Invalid SplitKeyRange start primary key"),
                           TabletServerErrorPB::UNKNOWN_ERROR,
                           context);
      return;
    }
  }
  if (req->has_stop_primary_key()) {
    s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->stop_primary_key(), &stop);
    if (PREDICT_FALSE(!s.ok())) {
      SetupErrorAndRespond(resp->mutable_error(),
                           Status::InvalidArgument("Invalid SplitKeyRange stop primary key"),
                           TabletServerErrorPB::UNKNOWN_ERROR,
                           context);
      return;
    }
  }
  if (req->has_start_primary_key() && req->has_stop_primary_key()) {
    // Validate the start key is less than the stop key, if they are both set
    if (start->encoded_key() > stop->encoded_key()) {
      SetupErrorAndRespond(resp->mutable_error(),
                           Status::InvalidArgument("Invalid primary key range"),
                           TabletServerErrorPB::UNKNOWN_ERROR,
                           context);
      return;
    }
  }

  // Validate the column are valid
  Schema schema;
  s = ColumnPBsToSchema(req->columns(), &schema);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(),
                         s,
                         TabletServerErrorPB::INVALID_SCHEMA,
                         context);
    return;
  }
  if (schema.has_column_ids()) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::InvalidArgument("User requests should not have Column IDs"),
                         TabletServerErrorPB::INVALID_SCHEMA,
                         context);
    return;
  }

  vector<ColumnId> column_ids;
  for (const ColumnSchema& column : schema.columns()) {
    int column_idx = tablet_schema.find_column(column.name());
    if (PREDICT_FALSE(column_idx == Schema::kColumnNotFound)) {
      SetupErrorAndRespond(resp->mutable_error(),
                           Status::InvalidArgument(
                               "Invalid SplitKeyRange column name", column.name()),
                           TabletServerErrorPB::INVALID_SCHEMA,
                           context);
      return;
    }
    column_ids.emplace_back(tablet_schema.column_id(column_idx));
  }

  // Validate the target chunk size are valid
  if (req->target_chunk_size_bytes() == 0) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::InvalidArgument("Invalid SplitKeyRange target chunk size"),
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }

  vector<KeyRange> ranges;
  tablet->SplitKeyRange(start, stop, column_ids, req->target_chunk_size_bytes(), &ranges);
  for (const auto& range : ranges) {
    range.ToPB(resp->add_ranges());
  }

  context->RespondSuccess();
}