public static void validateAwsConfiguration()

in flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java [401:433]


    public static void validateAwsConfiguration(Properties config) {
        if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {

            validateCredentialProvider(config);
            // if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
            CredentialProvider credentialsProviderType =
                    getCredentialProviderType(config, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
            if (credentialsProviderType == CredentialProvider.BASIC) {
                if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
                        || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
                    throw new IllegalArgumentException(
                            "Please set values for AWS Access Key ID ('"
                                    + AWSConfigConstants.AWS_ACCESS_KEY_ID
                                    + "') "
                                    + "and Secret Key ('"
                                    + AWSConfigConstants.AWS_SECRET_ACCESS_KEY
                                    + "') when using the BASIC AWS credential provider type.");
                }
            }
        }

        if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
            // specified AWS Region name must be recognizable
            if (!isValidRegion(getRegion(config))) {
                StringBuilder sb = new StringBuilder();
                for (Region region : Region.regions()) {
                    sb.append(region).append(", ");
                }
                throw new IllegalArgumentException(
                        "Invalid AWS region set in config. Valid values are: " + sb.toString());
            }
        }
    }