in core/src/main/scala/kafka/server/AdminManager.scala [1093:1231]
def alterUserScramCredentials(upsertions: Seq[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion],
deletions: Seq[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]): AlterUserScramCredentialsResponseData = {
def scramMechanism(mechanism: Byte): ScramMechanism = {
ScramMechanism.fromType(mechanism)
}
def mechanismName(mechanism: Byte): String = {
scramMechanism(mechanism).mechanismName
}
val retval = new AlterUserScramCredentialsResponseData()
// fail any user that is invalid due to an empty user name, an unknown SCRAM mechanism, or unacceptable number of iterations
val maxIterations = 16384
val illegalUpsertions = upsertions.map(upsertion =>
if (upsertion.name.isEmpty)
requestStatus(upsertion.name, None, false, upsertion.iterations) // no determined mechanism -- empty user is the cause of failure
else {
val publicScramMechanism = scramMechanism(upsertion.mechanism)
if (publicScramMechanism == ScramMechanism.UNKNOWN) {
requestStatus(upsertion.name, Some(publicScramMechanism), false, upsertion.iterations) // unknown mechanism is the cause of failure
} else {
if (upsertion.iterations < InternalScramMechanism.forMechanismName(publicScramMechanism.mechanismName).minIterations
|| upsertion.iterations > maxIterations) {
requestStatus(upsertion.name, Some(publicScramMechanism), false, upsertion.iterations) // known mechanism, bad iterations is the cause of failure
} else {
requestStatus(upsertion.name, Some(publicScramMechanism), true, upsertion.iterations) // legal
}
}
}).filter { !_.legalRequest }
val illegalDeletions = deletions.map(deletion =>
if (deletion.name.isEmpty) {
requestStatus(deletion.name, None, false, 0) // no determined mechanism -- empty user is the cause of failure
} else {
val publicScramMechanism = scramMechanism(deletion.mechanism)
requestStatus(deletion.name, Some(publicScramMechanism), publicScramMechanism != ScramMechanism.UNKNOWN, 0)
}).filter { !_.legalRequest }
// map user names to error messages
val unknownScramMechanismMsg = "Unknown SCRAM mechanism"
val tooFewIterationsMsg = "Too few iterations"
val tooManyIterationsMsg = "Too many iterations"
val illegalRequestsByUser =
illegalDeletions.map(requestStatus =>
if (requestStatus.user.isEmpty) {
(requestStatus.user, usernameMustNotBeEmptyMsg)
} else {
(requestStatus.user, unknownScramMechanismMsg)
}
).toMap ++ illegalUpsertions.map(requestStatus =>
if (requestStatus.user.isEmpty) {
(requestStatus.user, usernameMustNotBeEmptyMsg)
} else if (requestStatus.mechanism == Some(ScramMechanism.UNKNOWN)) {
(requestStatus.user, unknownScramMechanismMsg)
} else {
(requestStatus.user, if (requestStatus.iterations > maxIterations) {tooManyIterationsMsg} else {tooFewIterationsMsg})
}
).toMap
illegalRequestsByUser.foreach { case (user, errorMessage) =>
retval.results.add(new AlterUserScramCredentialsResult().setUser(user)
.setErrorCode(if (errorMessage == unknownScramMechanismMsg) {Errors.UNSUPPORTED_SASL_MECHANISM.code} else {Errors.UNACCEPTABLE_CREDENTIAL.code})
.setErrorMessage(errorMessage)) }
val invalidUsers = (illegalUpsertions ++ illegalDeletions).map(_.user).toSet
val initiallyValidUserMechanismPairs = (upsertions.filter(upsertion => !invalidUsers.contains(upsertion.name)).map(upsertion => (upsertion.name, upsertion.mechanism)) ++
deletions.filter(deletion => !invalidUsers.contains(deletion.name)).map(deletion => (deletion.name, deletion.mechanism)))
val usersWithDuplicateUserMechanismPairs = initiallyValidUserMechanismPairs.groupBy(identity).filter (
userMechanismPairAndOccurrencesTuple => userMechanismPairAndOccurrencesTuple._2.length > 1).keys.map(userMechanismPair => userMechanismPair._1).toSet
usersWithDuplicateUserMechanismPairs.foreach { user =>
retval.results.add(new AlterUserScramCredentialsResult()
.setUser(user)
.setErrorCode(Errors.DUPLICATE_RESOURCE.code).setErrorMessage("A user credential cannot be altered twice in the same request")) }
def potentiallyValidUserMechanismPairs = initiallyValidUserMechanismPairs.filter(pair => !usersWithDuplicateUserMechanismPairs.contains(pair._1))
val potentiallyValidUsers = potentiallyValidUserMechanismPairs.map(_._1).toSet
val configsByPotentiallyValidUser = potentiallyValidUsers.map(user => (user, adminZkClient.fetchEntityConfig(ConfigType.User, Sanitizer.sanitize(user)))).toMap
// check for deletion of a credential that does not exist
val invalidDeletions = deletions.filter(deletion => potentiallyValidUsers.contains(deletion.name)).filter(deletion =>
configsByPotentiallyValidUser(deletion.name).getProperty(mechanismName(deletion.mechanism)) == null)
val invalidUsersDueToInvalidDeletions = invalidDeletions.map(_.name).toSet
invalidUsersDueToInvalidDeletions.foreach { user =>
retval.results.add(new AlterUserScramCredentialsResult()
.setUser(user)
.setErrorCode(Errors.RESOURCE_NOT_FOUND.code).setErrorMessage("Attempt to delete a user credential that does not exist")) }
// now prepare the new set of property values for users that don't have any issues identified above,
// keeping track of ones that fail
val usersToTryToAlter = potentiallyValidUsers.diff(invalidUsersDueToInvalidDeletions)
val usersFailedToPrepareProperties = usersToTryToAlter.map(user => {
try {
// deletions: remove property keys
deletions.filter(deletion => usersToTryToAlter.contains(deletion.name)).foreach { deletion =>
configsByPotentiallyValidUser(deletion.name).remove(mechanismName(deletion.mechanism)) }
// upsertions: put property key/value
upsertions.filter(upsertion => usersToTryToAlter.contains(upsertion.name)).foreach { upsertion =>
val mechanism = InternalScramMechanism.forMechanismName(mechanismName(upsertion.mechanism))
val credential = new ScramFormatter(mechanism)
.generateCredential(upsertion.salt, upsertion.saltedPassword, upsertion.iterations)
configsByPotentiallyValidUser(upsertion.name).put(mechanismName(upsertion.mechanism), ScramCredentialUtils.credentialToString(credential)) }
(user) // success, 1 element, won't be matched
} catch {
case e: Exception =>
info(s"Error encountered while altering user SCRAM credentials", e)
(user, e) // fail, 2 elements, will be matched
}
}).collect { case (user: String, exception: Exception) => (user, exception) }.toMap
// now persist the properties we have prepared, again keeping track of whatever fails
val usersFailedToPersist = usersToTryToAlter.filterNot(usersFailedToPrepareProperties.contains).map(user => {
try {
adminZkClient.changeConfigs(ConfigType.User, Sanitizer.sanitize(user), configsByPotentiallyValidUser(user))
(user) // success, 1 element, won't be matched
} catch {
case e: Exception =>
info(s"Error encountered while altering user SCRAM credentials", e)
(user, e) // fail, 2 elements, will be matched
}
}).collect { case (user: String, exception: Exception) => (user, exception) }.toMap
// report failures
usersFailedToPrepareProperties.++(usersFailedToPersist).foreach { case (user, exception) =>
val error = Errors.forException(exception)
retval.results.add(new AlterUserScramCredentialsResult()
.setUser(user)
.setErrorCode(error.code)
.setErrorMessage(error.message)) }
// report successes
usersToTryToAlter.filterNot(usersFailedToPrepareProperties.contains).filterNot(usersFailedToPersist.contains).foreach { user =>
retval.results.add(new AlterUserScramCredentialsResult()
.setUser(user)
.setErrorCode(Errors.NONE.code)) }
retval
}