in flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java [401:433]
public static void validateAwsConfiguration(Properties config) {
if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
validateCredentialProvider(config);
// if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
CredentialProvider credentialsProviderType =
getCredentialProviderType(config, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
if (credentialsProviderType == CredentialProvider.BASIC) {
if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
throw new IllegalArgumentException(
"Please set values for AWS Access Key ID ('"
+ AWSConfigConstants.AWS_ACCESS_KEY_ID
+ "') "
+ "and Secret Key ('"
+ AWSConfigConstants.AWS_SECRET_ACCESS_KEY
+ "') when using the BASIC AWS credential provider type.");
}
}
}
if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
// specified AWS Region name must be recognizable
if (!isValidRegion(getRegion(config))) {
StringBuilder sb = new StringBuilder();
for (Region region : Region.regions()) {
sb.append(region).append(", ");
}
throw new IllegalArgumentException(
"Invalid AWS region set in config. Valid values are: " + sb.toString());
}
}
}