in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java [75:116]
static Properties forAwsCredentials(AwsCredentials awsCredentials) {
final Properties properties = new Properties();
if (awsCredentials.isDefault()) {
properties.setProperty(
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
AWSConfigConstants.CredentialProvider.AUTO.name());
} else if (awsCredentials.isBasic()) {
properties.setProperty(
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
AWSConfigConstants.CredentialProvider.BASIC.name());
final AwsCredentials.BasicAwsCredentials basicCredentials = awsCredentials.asBasic();
properties.setProperty(
AWSConfigConstants.accessKeyId(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
basicCredentials.accessKeyId());
properties.setProperty(
AWSConfigConstants.secretKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
basicCredentials.secretAccessKey());
} else if (awsCredentials.isProfile()) {
properties.setProperty(
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
AWSConfigConstants.CredentialProvider.PROFILE.name());
final AwsCredentials.ProfileAwsCredentials profileCredentials = awsCredentials.asProfile();
properties.setProperty(
AWSConfigConstants.profileName(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
profileCredentials.name());
profileCredentials
.path()
.ifPresent(
path ->
properties.setProperty(
AWSConfigConstants.profilePath(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
path));
} else {
throw new IllegalStateException(
"Unrecognized AWS credentials configuration type: " + awsCredentials);
}
return properties;
}