in src/kudu/rpc/server_negotiation.cc [435:578]
Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
if (request.step() != NegotiatePB::NEGOTIATE) {
Status s = Status::NotAuthorized("expected NEGOTIATE step",
NegotiatePB::NegotiateStep_Name(request.step()));
RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
return s;
}
TRACE("Received NEGOTIATE request from client");
// Fill in the set of features supported by the client.
for (int flag : request.supported_features()) {
// We only add the features that our local build knows about.
RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
if (feature_flag != UNKNOWN) {
client_features_.insert(feature_flag);
}
}
if (encryption_ == RpcEncryption::REQUIRED &&
!ContainsKey(client_features_, RpcFeatureFlag::TLS)) {
Status s = Status::NotAuthorized("client does not support required TLS encryption");
RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
return s;
}
// Find the set of mutually supported authentication types.
set<AuthenticationType> authn_types;
if (request.authn_types().empty()) {
// If the client doesn't send any support authentication types, we assume
// support for SASL. This preserves backwards compatibility with clients who
// don't support security features.
authn_types.insert(AuthenticationType::SASL);
} else {
for (const auto& type : request.authn_types()) {
switch (type.type_case()) {
case AuthenticationTypePB::kSasl:
authn_types.insert(AuthenticationType::SASL);
break;
case AuthenticationTypePB::kToken:
authn_types.insert(AuthenticationType::TOKEN);
break;
case AuthenticationTypePB::kCertificate:
// We only provide authenticated TLS if the certificates are generated
// by the internal CA.
// However for MySQL Raft we provide a backdoor to bypass this kudu restriction
// FLAGS_rpc_allow_external_cert_authentication = true, bypasses this limitation
if (FLAGS_rpc_allow_external_cert_authentication || !tls_context_->is_external_cert()) {
authn_types.insert(AuthenticationType::CERTIFICATE);
}
break;
case AuthenticationTypePB::TYPE_NOT_SET: {
Sockaddr addr;
RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
KLOG_EVERY_N_SECS(WARNING, 60)
<< "client supports unknown authentication type, consider updating server, address [EVERY 60 seconds]: "
<< addr.ToString();
break;
}
}
}
if (authn_types.empty()) {
Status s = Status::NotSupported("no mutually supported authentication types");
RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
return s;
}
}
// This is the key rule check which prioritizes TLS Based authentication
// over SASL based. If the client supports certificate based auth and
// the server has certificate based auth and encryption (consequently TLS)
// is not disabled, we should prioritize TLS authentication as the method.
if (encryption_ != RpcEncryption::DISABLED &&
ContainsKey(authn_types, AuthenticationType::CERTIFICATE) &&
tls_context_->has_signed_cert()) {
// If the client supports it and we are locally configured with TLS and have
// a CA-signed cert, choose cert authn.
// TODO(KUDU-1924): consider adding the fingerprint of the CA cert which signed
// the client's cert to the authentication message.
negotiated_authn_ = AuthenticationType::CERTIFICATE;
} else if (ContainsKey(authn_types, AuthenticationType::TOKEN) &&
token_verifier_->GetMaxKnownKeySequenceNumber() >= 0 &&
encryption_ != RpcEncryption::DISABLED &&
tls_context_->has_signed_cert()) {
// If the client supports it, we have a TSK to verify the client's token,
// and we have a signed-cert so the client can verify us, choose token authn.
// TODO(KUDU-1924): consider adding the TSK sequence number to the authentication
// message.
negotiated_authn_ = AuthenticationType::TOKEN;
} else {
// Otherwise we always can fallback to SASL.
DCHECK(ContainsKey(authn_types, AuthenticationType::SASL));
negotiated_authn_ = AuthenticationType::SASL;
}
// Fill in the NEGOTIATE step response for the client.
NegotiatePB response;
response.set_step(NegotiatePB::NEGOTIATE);
// Tell the client which features we support.
server_features_ = kSupportedServerRpcFeatureFlags;
if (tls_context_->has_cert() && encryption_ != RpcEncryption::DISABLED) {
server_features_.insert(TLS);
// If the remote peer is local, then we allow using TLS for authentication
// without encryption or integrity.
if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
server_features_.insert(TLS_AUTHENTICATION_ONLY);
}
}
for (RpcFeatureFlag feature : server_features_) {
response.add_supported_features(feature);
}
switch (negotiated_authn_) {
case AuthenticationType::CERTIFICATE:
response.add_authn_types()->mutable_certificate();
break;
case AuthenticationType::TOKEN:
response.add_authn_types()->mutable_token();
break;
case AuthenticationType::SASL: {
response.add_authn_types()->mutable_sasl();
const set<SaslMechanism::Type>& server_mechs = helper_.EnabledMechs();
if (PREDICT_FALSE(server_mechs.empty())) {
// This will happen if no mechanisms are enabled before calling Init()
Status s = Status::NotAuthorized("SASL server mechanism list is empty!");
LOG(ERROR) << s.ToString();
TRACE("Sending FATAL_UNAUTHORIZED response to client");
RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
return s;
}
for (auto mechanism : server_mechs) {
response.add_sasl_mechanisms()->set_mechanism(SaslMechanism::name_of(mechanism));
}
break;
}
case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
}
return SendNegotiatePB(response);
}