public AlterUserScramCredentialsResult alterUserScramCredentials()

in clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java [4121:4242]


    public AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
                                                                     AlterUserScramCredentialsOptions options) {
        final long now = time.milliseconds();
        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>();
        for (UserScramCredentialAlteration alteration: alterations) {
            futures.put(alteration.user(), new KafkaFutureImpl<>());
        }
        final Map<String, Exception> userIllegalAlterationExceptions = new HashMap<>();
        // We need to keep track of users with deletions of an unknown SCRAM mechanism
        final String usernameMustNotBeEmptyMsg = "Username must not be empty";
        String passwordMustNotBeEmptyMsg = "Password must not be empty";
        final String unknownScramMechanismMsg = "Unknown SCRAM mechanism";
        alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> {
            final String user = alteration.user();
            if (user == null || user.isEmpty()) {
                userIllegalAlterationExceptions.put(alteration.user(), new UnacceptableCredentialException(usernameMustNotBeEmptyMsg));
            } else {
                UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration;
                ScramMechanism mechanism = deletion.mechanism();
                if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
                    userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg));
                }
            }
        });
        // Creating an upsertion may throw InvalidKeyException or NoSuchAlgorithmException,
        // so keep track of which users are affected by such a failure so we can fail all their alterations later
        final Map<String, Map<ScramMechanism, AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions = new HashMap<>();
        alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion)
                .filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.user()))
                .forEach(alteration -> {
                    final String user = alteration.user();
                    if (user == null || user.isEmpty()) {
                        userIllegalAlterationExceptions.put(alteration.user(), new UnacceptableCredentialException(usernameMustNotBeEmptyMsg));
                    } else {
                        UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration;
                        try {
                            byte[] password = upsertion.password();
                            if (password == null || password.length == 0) {
                                userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(passwordMustNotBeEmptyMsg));
                            } else {
                                ScramMechanism mechanism = upsertion.credentialInfo().mechanism();
                                if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
                                    userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg));
                                } else {
                                    userInsertions.putIfAbsent(user, new HashMap<>());
                                    userInsertions.get(user).put(mechanism, getScramCredentialUpsertion(upsertion));
                                }
                            }
                        } catch (NoSuchAlgorithmException e) {
                            // we might overwrite an exception from a previous alteration, but we don't really care
                            // since we just need to mark this user as having at least one illegal alteration
                            // and make an exception instance available for completing the corresponding future exceptionally
                            userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg));
                        } catch (InvalidKeyException e) {
                            // generally shouldn't happen since we deal with the empty password case above,
                            // but we still need to catch/handle it
                            userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(e.getMessage(), e));
                        }
                    }
                });

        // submit alterations only for users that do not have an illegal alteration as identified above
        Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()),
                new ControllerNodeProvider()) {
            @Override
            public AlterUserScramCredentialsRequest.Builder createRequest(int timeoutMs) {
                return new AlterUserScramCredentialsRequest.Builder(
                        new AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream()
                                .filter(a -> a instanceof UserScramCredentialUpsertion)
                                .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user()))
                                .map(a -> userInsertions.get(a.user()).get(((UserScramCredentialUpsertion) a).credentialInfo().mechanism()))
                                .collect(Collectors.toList()))
                        .setDeletions(alterations.stream()
                                .filter(a -> a instanceof UserScramCredentialDeletion)
                                .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user()))
                                .map(d -> getScramCredentialDeletion((UserScramCredentialDeletion) d))
                                .collect(Collectors.toList())));
            }

            @Override
            public void handleResponse(AbstractResponse abstractResponse) {
                AlterUserScramCredentialsResponse response = (AlterUserScramCredentialsResponse) abstractResponse;
                // Check for controller change
                for (Errors error : response.errorCounts().keySet()) {
                    if (error == Errors.NOT_CONTROLLER) {
                        handleNotControllerError(error);
                    }
                }
                /* Now that we have the results for the ones we sent,
                 * fail any users that have an illegal alteration as identified above.
                 * Be sure to do this after the NOT_CONTROLLER error check above
                 * so that all errors are consistent in that case.
                 */
                userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> {
                    futures.get(entry.getKey()).completeExceptionally(entry.getValue());
                });
                response.data().results().forEach(result -> {
                    KafkaFutureImpl<Void> future = futures.get(result.user());
                    if (future == null) {
                        log.warn("Server response mentioned unknown user {}", result.user());
                    } else {
                        Errors error = Errors.forCode(result.errorCode());
                        if (error != Errors.NONE) {
                            future.completeExceptionally(error.exception(result.errorMessage()));
                        } else {
                            future.complete(null);
                        }
                    }
                });
                completeUnrealizedFutures(
                    futures.entrySet().stream(),
                    user -> "The broker response did not contain a result for user " + user);
            }

            @Override
            void handleFailure(Throwable throwable) {
                completeAllExceptionally(futures.values(), throwable);
            }
        };
        runnable.call(call, now);
        return new AlterUserScramCredentialsResult(new HashMap<>(futures));
    }