public static void validateAwsConfiguration()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/util/KinesisConfigUtil.java [422:458]


	public static void validateAwsConfiguration(Properties config) {
		if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
			String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);

			// value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
			CredentialProvider providerType;
			try {
				providerType = CredentialProvider.valueOf(credentialsProviderType);
			} catch (IllegalArgumentException e) {
				StringBuilder sb = new StringBuilder();
				for (CredentialProvider type : CredentialProvider.values()) {
					sb.append(type.toString()).append(", ");
				}
				throw new IllegalArgumentException("Invalid AWS Credential Provider Type set in config. Valid values are: " + sb.toString());
			}

			// if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
			if (providerType == CredentialProvider.BASIC) {
				if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
					|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
					throw new IllegalArgumentException("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
						"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
				}
			}
		}

		if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
			// specified AWS Region name must be recognizable
			if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
				StringBuilder sb = new StringBuilder();
				for (Regions region : Regions.values()) {
					sb.append(region.getName()).append(", ");
				}
				throw new IllegalArgumentException("Invalid AWS region set in config. Valid values are: " + sb.toString());
			}
		}
	}