def alterUserScramCredentials()

in core/src/main/scala/kafka/server/AdminManager.scala [1093:1231]


  def alterUserScramCredentials(upsertions: Seq[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion],
                                deletions: Seq[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]): AlterUserScramCredentialsResponseData = {

    def scramMechanism(mechanism: Byte): ScramMechanism = {
      ScramMechanism.fromType(mechanism)
    }

    def mechanismName(mechanism: Byte): String = {
      scramMechanism(mechanism).mechanismName
    }

    val retval = new AlterUserScramCredentialsResponseData()

    // fail any user that is invalid due to an empty user name, an unknown SCRAM mechanism, or unacceptable number of iterations
    val maxIterations = 16384
    val illegalUpsertions = upsertions.map(upsertion =>
      if (upsertion.name.isEmpty)
        requestStatus(upsertion.name, None, false, upsertion.iterations) // no determined mechanism -- empty user is the cause of failure
      else {
        val publicScramMechanism = scramMechanism(upsertion.mechanism)
        if (publicScramMechanism == ScramMechanism.UNKNOWN) {
          requestStatus(upsertion.name, Some(publicScramMechanism), false, upsertion.iterations) // unknown mechanism is the cause of failure
        } else {
          if (upsertion.iterations < InternalScramMechanism.forMechanismName(publicScramMechanism.mechanismName).minIterations
            || upsertion.iterations > maxIterations) {
            requestStatus(upsertion.name, Some(publicScramMechanism), false, upsertion.iterations) // known mechanism, bad iterations is the cause of failure
          } else {
            requestStatus(upsertion.name, Some(publicScramMechanism), true, upsertion.iterations) // legal
          }
        }
      }).filter { !_.legalRequest }
    val illegalDeletions = deletions.map(deletion =>
      if (deletion.name.isEmpty) {
        requestStatus(deletion.name, None, false, 0) // no determined mechanism -- empty user is the cause of failure
      } else {
        val publicScramMechanism = scramMechanism(deletion.mechanism)
        requestStatus(deletion.name, Some(publicScramMechanism), publicScramMechanism != ScramMechanism.UNKNOWN, 0)
      }).filter { !_.legalRequest }
    // map user names to error messages
    val unknownScramMechanismMsg = "Unknown SCRAM mechanism"
    val tooFewIterationsMsg = "Too few iterations"
    val tooManyIterationsMsg = "Too many iterations"
    val illegalRequestsByUser =
      illegalDeletions.map(requestStatus =>
        if (requestStatus.user.isEmpty) {
          (requestStatus.user, usernameMustNotBeEmptyMsg)
        } else {
          (requestStatus.user, unknownScramMechanismMsg)
        }
      ).toMap ++ illegalUpsertions.map(requestStatus =>
        if (requestStatus.user.isEmpty) {
          (requestStatus.user, usernameMustNotBeEmptyMsg)
        } else if (requestStatus.mechanism == Some(ScramMechanism.UNKNOWN)) {
          (requestStatus.user, unknownScramMechanismMsg)
        } else {
          (requestStatus.user, if (requestStatus.iterations > maxIterations) {tooManyIterationsMsg} else {tooFewIterationsMsg})
        }
      ).toMap

    illegalRequestsByUser.foreach { case (user, errorMessage) =>
      retval.results.add(new AlterUserScramCredentialsResult().setUser(user)
        .setErrorCode(if (errorMessage == unknownScramMechanismMsg) {Errors.UNSUPPORTED_SASL_MECHANISM.code} else {Errors.UNACCEPTABLE_CREDENTIAL.code})
        .setErrorMessage(errorMessage)) }

    val invalidUsers = (illegalUpsertions ++ illegalDeletions).map(_.user).toSet
    val initiallyValidUserMechanismPairs = (upsertions.filter(upsertion => !invalidUsers.contains(upsertion.name)).map(upsertion => (upsertion.name, upsertion.mechanism)) ++
      deletions.filter(deletion => !invalidUsers.contains(deletion.name)).map(deletion => (deletion.name, deletion.mechanism)))

    val usersWithDuplicateUserMechanismPairs = initiallyValidUserMechanismPairs.groupBy(identity).filter (
      userMechanismPairAndOccurrencesTuple => userMechanismPairAndOccurrencesTuple._2.length > 1).keys.map(userMechanismPair => userMechanismPair._1).toSet
    usersWithDuplicateUserMechanismPairs.foreach { user =>
      retval.results.add(new AlterUserScramCredentialsResult()
        .setUser(user)
        .setErrorCode(Errors.DUPLICATE_RESOURCE.code).setErrorMessage("A user credential cannot be altered twice in the same request")) }

    def potentiallyValidUserMechanismPairs = initiallyValidUserMechanismPairs.filter(pair => !usersWithDuplicateUserMechanismPairs.contains(pair._1))

    val potentiallyValidUsers = potentiallyValidUserMechanismPairs.map(_._1).toSet
    val configsByPotentiallyValidUser = potentiallyValidUsers.map(user => (user, adminZkClient.fetchEntityConfig(ConfigType.User, Sanitizer.sanitize(user)))).toMap

    // check for deletion of a credential that does not exist
    val invalidDeletions = deletions.filter(deletion => potentiallyValidUsers.contains(deletion.name)).filter(deletion =>
      configsByPotentiallyValidUser(deletion.name).getProperty(mechanismName(deletion.mechanism)) == null)
    val invalidUsersDueToInvalidDeletions = invalidDeletions.map(_.name).toSet
    invalidUsersDueToInvalidDeletions.foreach { user =>
      retval.results.add(new AlterUserScramCredentialsResult()
        .setUser(user)
        .setErrorCode(Errors.RESOURCE_NOT_FOUND.code).setErrorMessage("Attempt to delete a user credential that does not exist")) }

    // now prepare the new set of property values for users that don't have any issues identified above,
    // keeping track of ones that fail
    val usersToTryToAlter = potentiallyValidUsers.diff(invalidUsersDueToInvalidDeletions)
    val usersFailedToPrepareProperties = usersToTryToAlter.map(user => {
      try {
        // deletions: remove property keys
        deletions.filter(deletion => usersToTryToAlter.contains(deletion.name)).foreach { deletion =>
          configsByPotentiallyValidUser(deletion.name).remove(mechanismName(deletion.mechanism)) }
        // upsertions: put property key/value
        upsertions.filter(upsertion => usersToTryToAlter.contains(upsertion.name)).foreach { upsertion =>
          val mechanism = InternalScramMechanism.forMechanismName(mechanismName(upsertion.mechanism))
          val credential = new ScramFormatter(mechanism)
            .generateCredential(upsertion.salt, upsertion.saltedPassword, upsertion.iterations)
          configsByPotentiallyValidUser(upsertion.name).put(mechanismName(upsertion.mechanism), ScramCredentialUtils.credentialToString(credential)) }
        (user) // success, 1 element, won't be matched
      } catch {
        case e: Exception =>
          info(s"Error encountered while altering user SCRAM credentials", e)
          (user, e) // fail, 2 elements, will be matched
      }
    }).collect { case (user: String, exception: Exception) => (user, exception) }.toMap

    // now persist the properties we have prepared, again keeping track of whatever fails
    val usersFailedToPersist = usersToTryToAlter.filterNot(usersFailedToPrepareProperties.contains).map(user => {
      try {
        adminZkClient.changeConfigs(ConfigType.User, Sanitizer.sanitize(user), configsByPotentiallyValidUser(user))
        (user) // success, 1 element, won't be matched
      } catch {
        case e: Exception =>
          info(s"Error encountered while altering user SCRAM credentials", e)
          (user, e) // fail, 2 elements, will be matched
      }
    }).collect { case (user: String, exception: Exception) => (user, exception) }.toMap

    // report failures
    usersFailedToPrepareProperties.++(usersFailedToPersist).foreach { case (user, exception) =>
      val error = Errors.forException(exception)
      retval.results.add(new AlterUserScramCredentialsResult()
        .setUser(user)
        .setErrorCode(error.code)
        .setErrorMessage(error.message)) }

    // report successes
    usersToTryToAlter.filterNot(usersFailedToPrepareProperties.contains).filterNot(usersFailedToPersist.contains).foreach { user =>
      retval.results.add(new AlterUserScramCredentialsResult()
        .setUser(user)
        .setErrorCode(Errors.NONE.code)) }

    retval
  }