in src/kudu/tserver/tablet_service.cc [2289:2454]
void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
SplitKeyRangeResponsePB* resp,
RpcContext* context) {
TRACE_EVENT1("tserver", "TabletServiceImpl::SplitKeyRange",
"tablet_id", req->tablet_id());
DVLOG(3) << "Received SplitKeyRange RPC: " << SecureDebugString(*req);
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
context, &replica)) {
return;
}
if (FLAGS_tserver_enforce_access_control) {
TokenPB token;
if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) {
return;
}
const auto& privilege = token.authz().table_privilege();
if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(),
"SplitKeyRange", context)) {
return;
}
// Split-key requests require:
// if uses pk (e.g. primary key fields set):
// foreach(primary key column): SCAN ON COLUMN
// foreach(requested column): SCAN ON COLUMN
//
// If the privilege doesn't have full scan privileges, or column-level scan
// privileges, the user is definitely not authorized to perform a scan.
unordered_set<ColumnId> authorized_column_ids;
if (!CheckMayHaveScanPrivilegesOrRespond(privilege, "SplitKeyRange",
&authorized_column_ids, context)) {
return;
}
if (!privilege.scan_privilege()) {
const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
const Schema& schema = *schema_ptr;
unordered_set<ColumnId> required_column_privileges;
if (req->has_start_primary_key() || req->has_stop_primary_key()) {
const auto& key_cols = schema.get_key_column_ids();
required_column_privileges.insert(key_cols.begin(), key_cols.end());
}
bool is_authorized = true;
const string rejection_prefix = Substitute("rejecting SplitKeyRange request from $0",
context->requestor_string());
for (int i = 0; i < req->columns_size(); i++) {
const auto& column_name = req->columns(i).name();
int col_idx = schema.find_column(req->columns(i).name());
if (col_idx == Schema::kColumnNotFound) {
LOG(WARNING) << Substitute("$0: no column named '$1'", rejection_prefix, column_name);
is_authorized = false;
break;
}
EmplaceIfNotPresent(&required_column_privileges, schema.column_id(col_idx));
}
for (const auto& required_col_id : required_column_privileges) {
if (!ContainsKey(authorized_column_ids, required_col_id)) {
LOG(WARNING) << Substitute("$0: authz token doesn't authorize column ID $1",
rejection_prefix, required_col_id);
is_authorized = false;
break;
}
}
if (!is_authorized) {
context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED,
Status::NotAuthorized("not authorized to SplitKeyRange"));
return;
}
}
}
shared_ptr<Tablet> tablet;
TabletServerErrorPB::Code error_code;
Status s = GetTabletRef(replica, &tablet, &error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
// Decode encoded key
Arena arena(256);
const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
const Schema& tablet_schema = *tablet_schema_ptr;
EncodedKey* start = nullptr;
EncodedKey* stop = nullptr;
if (req->has_start_primary_key()) {
s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->start_primary_key(), &start);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid SplitKeyRange start primary key"),
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
}
if (req->has_stop_primary_key()) {
s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->stop_primary_key(), &stop);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid SplitKeyRange stop primary key"),
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
}
if (req->has_start_primary_key() && req->has_stop_primary_key()) {
// Validate the start key is less than the stop key, if they are both set
if (start->encoded_key() > stop->encoded_key()) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid primary key range"),
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
}
// Validate the column are valid
Schema schema;
s = ColumnPBsToSchema(req->columns(), &schema);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(),
s,
TabletServerErrorPB::INVALID_SCHEMA,
context);
return;
}
if (schema.has_column_ids()) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("User requests should not have Column IDs"),
TabletServerErrorPB::INVALID_SCHEMA,
context);
return;
}
vector<ColumnId> column_ids;
for (const ColumnSchema& column : schema.columns()) {
int column_idx = tablet_schema.find_column(column.name());
if (PREDICT_FALSE(column_idx == Schema::kColumnNotFound)) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument(
"Invalid SplitKeyRange column name", column.name()),
TabletServerErrorPB::INVALID_SCHEMA,
context);
return;
}
column_ids.emplace_back(tablet_schema.column_id(column_idx));
}
// Validate the target chunk size are valid
if (req->target_chunk_size_bytes() == 0) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid SplitKeyRange target chunk size"),
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
vector<KeyRange> ranges;
tablet->SplitKeyRange(start, stop, column_ids, req->target_chunk_size_bytes(), &ranges);
for (const auto& range : ranges) {
range.ToPB(resp->add_ranges());
}
context->RespondSuccess();
}