void TabletServiceImpl::Write()

in src/kudu/tserver/tablet_service.cc [1538:1730]


void TabletServiceImpl::Write(const WriteRequestPB* req,
                              WriteResponsePB* resp,
                              RpcContext* context) {
  const auto& tablet_id = req->tablet_id();
  TRACE_EVENT1("tserver", "TabletServiceImpl::Write",
               "tablet_id", tablet_id);
  DVLOG(3) << Substitute("Received Write RPC: $0, requestor: $1, request id: $2",
                        SecureDebugString(*req), context->requestor_string(),
                        context->request_id() == nullptr ?
                        "" : SecureDebugString(*context->request_id()));
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(
        server_->tablet_manager(), tablet_id, resp, context, &replica)) {
    return;
  }
  optional<WriteAuthorizationContext> authz_context;
  if (FLAGS_tserver_enforce_access_control) {
    TokenPB token;
    if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) {
      return;
    }
    const auto& privilege = token.authz().table_privilege();
    if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(),
                                       "Write", context)) {
      return;
    }
    WritePrivileges privileges;
    if (privilege.insert_privilege()) {
      InsertOrDie(&privileges, WritePrivilegeType::INSERT);
    }
    if (privilege.update_privilege()) {
      InsertOrDie(&privileges, WritePrivilegeType::UPDATE);
    }
    if (privilege.delete_privilege()) {
      InsertOrDie(&privileges, WritePrivilegeType::DELETE);
    }
    if (privileges.empty()) {
      // If we know there are no write-related privileges outright, we can
      // short-circuit further checking and reject the request immediately.
      // Otherwise, we'll defer the checking to the prepare phase of the
      // op after decoding the operations.
      static const auto kStatus = Status::NotAuthorized("not authorized to write");
      const auto msg = Substitute(
          "rejecting write request: no write privileges ($0)",
          context->requestor_string());
      KLOG_EVERY_N_SECS(INFO, 1) << msg << THROTTLE_MSG;
      return context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED,
                                        kStatus);
    }
    authz_context = WriteAuthorizationContext{ privileges, /*requested_op_types=*/{} };
  }

  if (PREDICT_FALSE(!server_->clock()->SupportsExternalConsistencyMode(
      req->external_consistency_mode()))) {
    constexpr const char* const kMsg =
        "rejecting write request: required consistency mode unsupported by clock";
    static const auto kStatus = Status::NotSupported(kMsg);
    KLOG_EVERY_N_SECS(INFO, 1) << kMsg << THROTTLE_MSG;
    return SetupErrorAndRespond(resp->mutable_error(),
                                kStatus,
                                TabletServerErrorPB::UNKNOWN_ERROR,
                                context);
  }

  shared_ptr<Tablet> tablet;
  TabletServerErrorPB::Code error_code;
  Status s = GetTabletRef(replica, &tablet, &error_code);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
    return;
  }

  const uint64_t bytes = req->row_operations().rows().size() +
      req->row_operations().indirect_data().size();
  if (!tablet->ShouldThrottleAllow(bytes)) {
    constexpr const char* const kMsg = "rejecting write request: throttled";
    static const auto kStatus = Status::ServiceUnavailable(kMsg);
    KLOG_EVERY_N_SECS(INFO, 1) << kMsg << THROTTLE_MSG;
    return SetupErrorAndRespond(resp->mutable_error(),
                                kStatus,
                                TabletServerErrorPB::THROTTLED,
                                context);
  }

  // Check for memory pressure; don't bother doing any additional work if we've
  // exceeded the limit.
  double capacity_pct;
  if (process_memory::SoftLimitExceeded(&capacity_pct)) {
    constexpr const char* const kMsg =
        "rejecting write request: soft memory limit exceeded";
    static const auto kStatus = Status::ServiceUnavailable(kMsg);
    tablet->metrics()->leader_memory_pressure_rejections->Increment();
    string msg = StringPrintf("%s (at %.2f%% of capacity)", kMsg, capacity_pct);
    if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) {
      KLOG_EVERY_N_SECS(WARNING, 1) << msg << THROTTLE_MSG;
    } else {
      KLOG_EVERY_N_SECS(INFO, 1) << msg << THROTTLE_MSG;
    }
    return SetupErrorAndRespond(resp->mutable_error(),
                                kStatus,
                                TabletServerErrorPB::THROTTLED,
                                context);
  }

  // If the apply queue is overloaded, the write request might be rejected.
  // The longer the queue was in overloaded state, the higher the probability
  // of rejecting the request.
  MonoDelta queue_otime;
  MonoDelta threshold;
  if (server_->tablet_apply_pool()->QueueOverloaded(&queue_otime, &threshold)) {
    DCHECK(threshold.Initialized());
    DCHECK_GT(threshold.ToMilliseconds(), 0);
    auto overload_threshold_ms = threshold.ToMilliseconds();
    // The longer the queue has been in the overloaded state, the higher the
    // probability of an op to be rejected.
    auto time_factor = queue_otime.ToMilliseconds() / overload_threshold_ms + 1;
    if (!rng_.OneIn(time_factor * time_factor + 1)) {
      constexpr const char* const kMsg =
          "rejecting write request: apply queue overloaded";
      static const auto kStatus = Status::ServiceUnavailable(kMsg);
      num_op_apply_queue_rejections_->Increment();
      KLOG_EVERY_N_SECS(INFO, 1) << kMsg << THROTTLE_MSG;
      return SetupErrorAndRespond(resp->mutable_error(),
                                  kStatus,
                                  TabletServerErrorPB::THROTTLED,
                                  context);
    }
  }

  unique_ptr<WriteOpState> op_state(new WriteOpState(
      replica.get(),
      req,
      context->AreResultsTracked() ? context->request_id() : nullptr,
      resp,
      std::move(authz_context)));

  // If the client sent us a timestamp, decode it and update the clock so that all future
  // timestamps are greater than the passed timestamp.
  if (req->has_propagated_timestamp()) {
    Timestamp ts(req->propagated_timestamp());
    s = server_->clock()->Update(ts);
  }
  if (PREDICT_FALSE(!s.ok())) {
    return SetupErrorAndRespond(
        resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
  }

  const auto deadline = context->GetClientDeadline();
  const auto& username = context->remote_user().username();

  if (!req->has_txn_id() ||
      PREDICT_FALSE(!FLAGS_tserver_txn_write_op_handling_enabled)) {
    op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
        new RpcOpCompletionCallback<WriteResponsePB>(context, resp)));

    // Submit the write operation. The RPC will be responded asynchronously.
    s = replica->SubmitWrite(std::move(op_state), deadline);
  } else {
    if (!FLAGS_enable_txn_system_client_init) {
      return SetupErrorAndRespond(
          resp->mutable_error(),
          Status::NotSupported(Substitute("txns not supported on server $0",
                                          replica->permanent_uuid())),
          TabletServerErrorPB::UNKNOWN_ERROR, context);
    }
    auto abort_func = [this, txn_id = req->txn_id(), &username] {
      return server_->tablet_manager()->ScheduleAbortTxn(txn_id, username);
    };
    op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
        new TxnWriteCompletionCallback(context, resp, std::move(abort_func))));

    // If it's a write operation in the context of a multi-row transaction,
    // schedule running preliminary tasks if necessary: register the tablet as
    // a participant in the transaction and begin transaction for the
    // participating tablet.
    //
    // This functor is to schedule preliminary tasks prior to submitting
    // the write operation via TabletReplica::SubmitWrite().
    const auto scheduler = [this, &username, replica, deadline](
        int64_t txn_id, tablet::RegisteredTxnCallback began_txn_cb) {
      return server_->tablet_manager()->SchedulePreliminaryTasksForTxnWrite(
          std::move(replica), txn_id, username, deadline, std::move(began_txn_cb));
    };
    s = replica->SubmitTxnWrite(std::move(op_state), scheduler);
    VLOG(2) << Substitute("submitting txn write op: $0", s.ToString());
  }

  // Check that we could submit the write
  if (PREDICT_FALSE(!s.ok())) {
    return SetupErrorAndRespond(
        resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
  }
}