in src/kudu/rpc/server_negotiation.cc [454:615]
Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
if (request.step() != NegotiatePB::NEGOTIATE) {
Status s = Status::NotAuthorized("expected NEGOTIATE step",
NegotiatePB::NegotiateStep_Name(request.step()));
RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
return s;
}
TRACE("Received NEGOTIATE request from client");
// Fill in the set of features supported by the client.
for (int flag : request.supported_features()) {
// We only add the features that our local build knows about.
RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
if (feature_flag != UNKNOWN) {
client_features_.insert(feature_flag);
}
}
if (encryption_ == RpcEncryption::REQUIRED &&
!ContainsKey(client_features_, RpcFeatureFlag::TLS)) {
Status s = Status::NotAuthorized("client does not support required TLS encryption");
RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
return s;
}
// Find the set of mutually supported authentication types.
set<AuthenticationType> authn_types;
if (request.authn_types().empty()) {
// If the client doesn't send any support authentication types, we assume
// support for SASL. This preserves backwards compatibility with clients who
// don't support security features.
authn_types.insert(AuthenticationType::SASL);
} else {
for (const auto& type : request.authn_types()) {
switch (type.type_case()) {
case AuthenticationTypePB::kSasl:
authn_types.insert(AuthenticationType::SASL);
break;
case AuthenticationTypePB::kToken:
authn_types.insert(AuthenticationType::TOKEN);
break;
case AuthenticationTypePB::kCertificate:
// We only provide authenticated TLS if the certificates are generated
// by the internal CA.
if (!tls_context_->is_external_cert()) {
authn_types.insert(AuthenticationType::CERTIFICATE);
}
break;
case AuthenticationTypePB::kJwt:
if (jwt_verifier_) {
authn_types.insert(AuthenticationType::JWT);
}
break;
case AuthenticationTypePB::TYPE_NOT_SET:
default: {
Sockaddr addr;
const auto s = socket_->GetPeerAddress(&addr);
WARN_NOT_OK(s, "unable to get peer address");
constexpr const char* const kFormat =
"client at $0 supports unknown authentication type $1, consider updating server";
KLOG_EVERY_N_SECS(WARNING, 60)
<< Substitute(kFormat,
s.ok() ? addr.ToString() : "<unknown address>",
static_cast<uint32_t>(type.type_case()))
<< THROTTLE_MSG;
break;
}
}
}
if (authn_types.empty()) {
Status s = Status::NotSupported("no mutually supported authentication types");
RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
return s;
}
}
if (encryption_ != RpcEncryption::DISABLED &&
ContainsKey(authn_types, AuthenticationType::CERTIFICATE) &&
tls_context_->has_signed_cert()) {
// If the client supports it and we are locally configured with TLS and have
// a CA-signed cert, choose cert authn.
// TODO(KUDU-1924): consider adding the fingerprint of the CA cert which signed
// the client's cert to the authentication message.
negotiated_authn_ = AuthenticationType::CERTIFICATE;
} else if (ContainsKey(authn_types, AuthenticationType::TOKEN) &&
token_verifier_->GetMaxKnownKeySequenceNumber() >= 0 &&
encryption_ != RpcEncryption::DISABLED &&
tls_context_->has_signed_cert()) {
// If the client supports it, we have a TSK to verify the client's token,
// and we have a signed-cert so the client can verify us, choose token authn.
// TODO(KUDU-1924): consider adding the TSK sequence number to the authentication
// message.
negotiated_authn_ = AuthenticationType::TOKEN;
} else if (ContainsKey(authn_types, AuthenticationType::JWT) &&
encryption_ != RpcEncryption::DISABLED &&
tls_context_->has_signed_cert()) {
// The client should send its JWT only to servers that it trusts because
// an untrusted server could be run by a malicious impostor who might steal
// the client's credentials. So, a benevolent server advertises its JWT
// authentication ability to clients only if it has a CA-signed TLS
// certificate that the client might verify against its bundle of trusted
// CA certificates.
negotiated_authn_ = AuthenticationType::JWT;
} else {
// Otherwise we always can fallback to SASL.
DCHECK(ContainsKey(authn_types, AuthenticationType::SASL));
negotiated_authn_ = AuthenticationType::SASL;
}
// Fill in the NEGOTIATE step response for the client.
NegotiatePB response;
response.set_step(NegotiatePB::NEGOTIATE);
// Tell the client which features we support.
server_features_ = kSupportedServerRpcFeatureFlags;
if (tls_context_->has_cert() && encryption_ != RpcEncryption::DISABLED) {
server_features_.insert(TLS);
// If the remote peer is local, then we allow using TLS for authentication
// without encryption or integrity.
if (socket_->IsLoopbackConnection() && !encrypt_loopback_) {
server_features_.insert(TLS_AUTHENTICATION_ONLY);
}
}
for (RpcFeatureFlag feature : server_features_) {
response.add_supported_features(feature);
}
switch (negotiated_authn_) {
case AuthenticationType::CERTIFICATE:
response.add_authn_types()->mutable_certificate();
break;
case AuthenticationType::TOKEN:
response.add_authn_types()->mutable_token();
break;
case AuthenticationType::SASL: {
response.add_authn_types()->mutable_sasl();
const set<SaslMechanism::Type>& server_mechs = helper_.EnabledMechs();
if (PREDICT_FALSE(server_mechs.empty())) {
// This will happen if no mechanisms are enabled before calling Init()
Status s = Status::NotAuthorized("SASL server mechanism list is empty!");
LOG(ERROR) << s.ToString();
TRACE("Sending FATAL_UNAUTHORIZED response to client");
RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
return s;
}
for (auto mechanism : server_mechs) {
response.add_sasl_mechanisms()->set_mechanism(SaslMechanism::name_of(mechanism));
}
break;
}
case AuthenticationType::JWT:
response.add_authn_types()->mutable_jwt();
break;
case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
}
return SendNegotiatePB(response);
}