in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/util/KinesisConfigUtil.java [422:458]
public static void validateAwsConfiguration(Properties config) {
if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
// value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
CredentialProvider providerType;
try {
providerType = CredentialProvider.valueOf(credentialsProviderType);
} catch (IllegalArgumentException e) {
StringBuilder sb = new StringBuilder();
for (CredentialProvider type : CredentialProvider.values()) {
sb.append(type.toString()).append(", ");
}
throw new IllegalArgumentException("Invalid AWS Credential Provider Type set in config. Valid values are: " + sb.toString());
}
// if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
if (providerType == CredentialProvider.BASIC) {
if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
throw new IllegalArgumentException("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
}
}
}
if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
// specified AWS Region name must be recognizable
if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
StringBuilder sb = new StringBuilder();
for (Regions region : Regions.values()) {
sb.append(region.getName()).append(", ");
}
throw new IllegalArgumentException("Invalid AWS region set in config. Valid values are: " + sb.toString());
}
}
}