Status ServerNegotiation::HandleNegotiate()

in src/kudu/rpc/server_negotiation.cc [435:578]


Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
  if (request.step() != NegotiatePB::NEGOTIATE) {
    Status s = Status::NotAuthorized("expected NEGOTIATE step",
                                     NegotiatePB::NegotiateStep_Name(request.step()));
    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
    return s;
  }
  TRACE("Received NEGOTIATE request from client");

  // Fill in the set of features supported by the client.
  for (int flag : request.supported_features()) {
    // We only add the features that our local build knows about.
    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
    if (feature_flag != UNKNOWN) {
      client_features_.insert(feature_flag);
    }
  }

  if (encryption_ == RpcEncryption::REQUIRED &&
      !ContainsKey(client_features_, RpcFeatureFlag::TLS)) {
    Status s = Status::NotAuthorized("client does not support required TLS encryption");
    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
    return s;
  }

  // Find the set of mutually supported authentication types.
  set<AuthenticationType> authn_types;
  if (request.authn_types().empty()) {
    // If the client doesn't send any support authentication types, we assume
    // support for SASL. This preserves backwards compatibility with clients who
    // don't support security features.
    authn_types.insert(AuthenticationType::SASL);
  } else {
    for (const auto& type : request.authn_types()) {
      switch (type.type_case()) {
        case AuthenticationTypePB::kSasl:
          authn_types.insert(AuthenticationType::SASL);
          break;
        case AuthenticationTypePB::kToken:
          authn_types.insert(AuthenticationType::TOKEN);
          break;
        case AuthenticationTypePB::kCertificate:
          // We only provide authenticated TLS if the certificates are generated
          // by the internal CA.
          // However for MySQL Raft we provide a backdoor to bypass this kudu restriction
          // FLAGS_rpc_allow_external_cert_authentication = true, bypasses this limitation
          if (FLAGS_rpc_allow_external_cert_authentication || !tls_context_->is_external_cert()) {
            authn_types.insert(AuthenticationType::CERTIFICATE);
          }
          break;
        case AuthenticationTypePB::TYPE_NOT_SET: {
          Sockaddr addr;
          RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
          KLOG_EVERY_N_SECS(WARNING, 60)
              << "client supports unknown authentication type, consider updating server, address [EVERY 60 seconds]: "
              << addr.ToString();
          break;
        }
      }
    }

    if (authn_types.empty()) {
      Status s = Status::NotSupported("no mutually supported authentication types");
      RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
      return s;
    }
  }

  // This is the key rule check which prioritizes TLS Based authentication
  // over SASL based. If the client supports certificate based auth and
  // the server has certificate based auth and encryption (consequently TLS)
  // is not disabled, we should prioritize TLS authentication as the method.
  if (encryption_ != RpcEncryption::DISABLED &&
      ContainsKey(authn_types, AuthenticationType::CERTIFICATE) &&
      tls_context_->has_signed_cert()) {
    // If the client supports it and we are locally configured with TLS and have
    // a CA-signed cert, choose cert authn.
    // TODO(KUDU-1924): consider adding the fingerprint of the CA cert which signed
    // the client's cert to the authentication message.
    negotiated_authn_ = AuthenticationType::CERTIFICATE;
  } else if (ContainsKey(authn_types, AuthenticationType::TOKEN) &&
             token_verifier_->GetMaxKnownKeySequenceNumber() >= 0 &&
             encryption_ != RpcEncryption::DISABLED &&
             tls_context_->has_signed_cert()) {
    // If the client supports it, we have a TSK to verify the client's token,
    // and we have a signed-cert so the client can verify us, choose token authn.
    // TODO(KUDU-1924): consider adding the TSK sequence number to the authentication
    // message.
    negotiated_authn_ = AuthenticationType::TOKEN;
  } else {
    // Otherwise we always can fallback to SASL.
    DCHECK(ContainsKey(authn_types, AuthenticationType::SASL));
    negotiated_authn_ = AuthenticationType::SASL;
  }

  // Fill in the NEGOTIATE step response for the client.
  NegotiatePB response;
  response.set_step(NegotiatePB::NEGOTIATE);

  // Tell the client which features we support.
  server_features_ = kSupportedServerRpcFeatureFlags;
  if (tls_context_->has_cert() && encryption_ != RpcEncryption::DISABLED) {
    server_features_.insert(TLS);
    // If the remote peer is local, then we allow using TLS for authentication
    // without encryption or integrity.
    if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
      server_features_.insert(TLS_AUTHENTICATION_ONLY);
    }
  }

  for (RpcFeatureFlag feature : server_features_) {
    response.add_supported_features(feature);
  }

  switch (negotiated_authn_) {
    case AuthenticationType::CERTIFICATE:
      response.add_authn_types()->mutable_certificate();
      break;
    case AuthenticationType::TOKEN:
      response.add_authn_types()->mutable_token();
      break;
    case AuthenticationType::SASL: {
      response.add_authn_types()->mutable_sasl();
      const set<SaslMechanism::Type>& server_mechs = helper_.EnabledMechs();
      if (PREDICT_FALSE(server_mechs.empty())) {
        // This will happen if no mechanisms are enabled before calling Init()
        Status s = Status::NotAuthorized("SASL server mechanism list is empty!");
        LOG(ERROR) << s.ToString();
        TRACE("Sending FATAL_UNAUTHORIZED response to client");
        RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
        return s;
      }

      for (auto mechanism : server_mechs) {
        response.add_sasl_mechanisms()->set_mechanism(SaslMechanism::name_of(mechanism));
      }
      break;
    }
    case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
  }

  return SendNegotiatePB(response);
}