in src/kudu/tserver/tablet_service.cc [1538:1730]
void TabletServiceImpl::Write(const WriteRequestPB* req,
WriteResponsePB* resp,
RpcContext* context) {
const auto& tablet_id = req->tablet_id();
TRACE_EVENT1("tserver", "TabletServiceImpl::Write",
"tablet_id", tablet_id);
DVLOG(3) << Substitute("Received Write RPC: $0, requestor: $1, request id: $2",
SecureDebugString(*req), context->requestor_string(),
context->request_id() == nullptr ?
"" : SecureDebugString(*context->request_id()));
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(
server_->tablet_manager(), tablet_id, resp, context, &replica)) {
return;
}
optional<WriteAuthorizationContext> authz_context;
if (FLAGS_tserver_enforce_access_control) {
TokenPB token;
if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) {
return;
}
const auto& privilege = token.authz().table_privilege();
if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(),
"Write", context)) {
return;
}
WritePrivileges privileges;
if (privilege.insert_privilege()) {
InsertOrDie(&privileges, WritePrivilegeType::INSERT);
}
if (privilege.update_privilege()) {
InsertOrDie(&privileges, WritePrivilegeType::UPDATE);
}
if (privilege.delete_privilege()) {
InsertOrDie(&privileges, WritePrivilegeType::DELETE);
}
if (privileges.empty()) {
// If we know there are no write-related privileges outright, we can
// short-circuit further checking and reject the request immediately.
// Otherwise, we'll defer the checking to the prepare phase of the
// op after decoding the operations.
static const auto kStatus = Status::NotAuthorized("not authorized to write");
const auto msg = Substitute(
"rejecting write request: no write privileges ($0)",
context->requestor_string());
KLOG_EVERY_N_SECS(INFO, 1) << msg << THROTTLE_MSG;
return context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED,
kStatus);
}
authz_context = WriteAuthorizationContext{ privileges, /*requested_op_types=*/{} };
}
if (PREDICT_FALSE(!server_->clock()->SupportsExternalConsistencyMode(
req->external_consistency_mode()))) {
constexpr const char* const kMsg =
"rejecting write request: required consistency mode unsupported by clock";
static const auto kStatus = Status::NotSupported(kMsg);
KLOG_EVERY_N_SECS(INFO, 1) << kMsg << THROTTLE_MSG;
return SetupErrorAndRespond(resp->mutable_error(),
kStatus,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
}
shared_ptr<Tablet> tablet;
TabletServerErrorPB::Code error_code;
Status s = GetTabletRef(replica, &tablet, &error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
const uint64_t bytes = req->row_operations().rows().size() +
req->row_operations().indirect_data().size();
if (!tablet->ShouldThrottleAllow(bytes)) {
constexpr const char* const kMsg = "rejecting write request: throttled";
static const auto kStatus = Status::ServiceUnavailable(kMsg);
KLOG_EVERY_N_SECS(INFO, 1) << kMsg << THROTTLE_MSG;
return SetupErrorAndRespond(resp->mutable_error(),
kStatus,
TabletServerErrorPB::THROTTLED,
context);
}
// Check for memory pressure; don't bother doing any additional work if we've
// exceeded the limit.
double capacity_pct;
if (process_memory::SoftLimitExceeded(&capacity_pct)) {
constexpr const char* const kMsg =
"rejecting write request: soft memory limit exceeded";
static const auto kStatus = Status::ServiceUnavailable(kMsg);
tablet->metrics()->leader_memory_pressure_rejections->Increment();
string msg = StringPrintf("%s (at %.2f%% of capacity)", kMsg, capacity_pct);
if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) {
KLOG_EVERY_N_SECS(WARNING, 1) << msg << THROTTLE_MSG;
} else {
KLOG_EVERY_N_SECS(INFO, 1) << msg << THROTTLE_MSG;
}
return SetupErrorAndRespond(resp->mutable_error(),
kStatus,
TabletServerErrorPB::THROTTLED,
context);
}
// If the apply queue is overloaded, the write request might be rejected.
// The longer the queue was in overloaded state, the higher the probability
// of rejecting the request.
MonoDelta queue_otime;
MonoDelta threshold;
if (server_->tablet_apply_pool()->QueueOverloaded(&queue_otime, &threshold)) {
DCHECK(threshold.Initialized());
DCHECK_GT(threshold.ToMilliseconds(), 0);
auto overload_threshold_ms = threshold.ToMilliseconds();
// The longer the queue has been in the overloaded state, the higher the
// probability of an op to be rejected.
auto time_factor = queue_otime.ToMilliseconds() / overload_threshold_ms + 1;
if (!rng_.OneIn(time_factor * time_factor + 1)) {
constexpr const char* const kMsg =
"rejecting write request: apply queue overloaded";
static const auto kStatus = Status::ServiceUnavailable(kMsg);
num_op_apply_queue_rejections_->Increment();
KLOG_EVERY_N_SECS(INFO, 1) << kMsg << THROTTLE_MSG;
return SetupErrorAndRespond(resp->mutable_error(),
kStatus,
TabletServerErrorPB::THROTTLED,
context);
}
}
unique_ptr<WriteOpState> op_state(new WriteOpState(
replica.get(),
req,
context->AreResultsTracked() ? context->request_id() : nullptr,
resp,
std::move(authz_context)));
// If the client sent us a timestamp, decode it and update the clock so that all future
// timestamps are greater than the passed timestamp.
if (req->has_propagated_timestamp()) {
Timestamp ts(req->propagated_timestamp());
s = server_->clock()->Update(ts);
}
if (PREDICT_FALSE(!s.ok())) {
return SetupErrorAndRespond(
resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
}
const auto deadline = context->GetClientDeadline();
const auto& username = context->remote_user().username();
if (!req->has_txn_id() ||
PREDICT_FALSE(!FLAGS_tserver_txn_write_op_handling_enabled)) {
op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
new RpcOpCompletionCallback<WriteResponsePB>(context, resp)));
// Submit the write operation. The RPC will be responded asynchronously.
s = replica->SubmitWrite(std::move(op_state), deadline);
} else {
if (!FLAGS_enable_txn_system_client_init) {
return SetupErrorAndRespond(
resp->mutable_error(),
Status::NotSupported(Substitute("txns not supported on server $0",
replica->permanent_uuid())),
TabletServerErrorPB::UNKNOWN_ERROR, context);
}
auto abort_func = [this, txn_id = req->txn_id(), &username] {
return server_->tablet_manager()->ScheduleAbortTxn(txn_id, username);
};
op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
new TxnWriteCompletionCallback(context, resp, std::move(abort_func))));
// If it's a write operation in the context of a multi-row transaction,
// schedule running preliminary tasks if necessary: register the tablet as
// a participant in the transaction and begin transaction for the
// participating tablet.
//
// This functor is to schedule preliminary tasks prior to submitting
// the write operation via TabletReplica::SubmitWrite().
const auto scheduler = [this, &username, replica, deadline](
int64_t txn_id, tablet::RegisteredTxnCallback began_txn_cb) {
return server_->tablet_manager()->SchedulePreliminaryTasksForTxnWrite(
std::move(replica), txn_id, username, deadline, std::move(began_txn_cb));
};
s = replica->SubmitTxnWrite(std::move(op_state), scheduler);
VLOG(2) << Substitute("submitting txn write op: $0", s.ToString());
}
// Check that we could submit the write
if (PREDICT_FALSE(!s.ok())) {
return SetupErrorAndRespond(
resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
}
}