in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java [131:201]
private static AWSCredentialsProvider getCredentialsProvider(
final Properties configProps, final String configPrefix) {
CredentialProvider credentialProviderType =
AWSClientUtil.getCredentialProviderType(configProps, configPrefix);
switch (credentialProviderType) {
case ENV_VAR:
return new EnvironmentVariableCredentialsProvider();
case SYS_PROP:
return new SystemPropertiesCredentialsProvider();
case PROFILE:
String profileName =
configProps.getProperty(AWSConfigConstants.profileName(configPrefix), null);
String profileConfigPath =
configProps.getProperty(AWSConfigConstants.profilePath(configPrefix), null);
return (profileConfigPath == null)
? new ProfileCredentialsProvider(profileName)
: new ProfileCredentialsProvider(profileConfigPath, profileName);
case BASIC:
return new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(
configProps.getProperty(
AWSConfigConstants.accessKeyId(configPrefix)),
configProps.getProperty(
AWSConfigConstants.secretKey(configPrefix)));
}
@Override
public void refresh() {
// do nothing
}
};
case ASSUME_ROLE:
return new STSAssumeRoleSessionCredentialsProvider.Builder(
configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)),
configProps.getProperty(
AWSConfigConstants.roleSessionName(configPrefix)))
.withExternalId(
configProps.getProperty(
AWSConfigConstants.externalId(configPrefix)))
.withStsClient(createStsClient(configProps, configPrefix))
.build();
case WEB_IDENTITY_TOKEN:
return WebIdentityTokenCredentialsProvider.builder()
.roleArn(
configProps.getProperty(
AWSConfigConstants.roleArn(configPrefix), null))
.roleSessionName(
configProps.getProperty(
AWSConfigConstants.roleSessionName(configPrefix), null))
.webIdentityTokenFile(
configProps.getProperty(
AWSConfigConstants.webIdentityTokenFile(configPrefix),
null))
.build();
case AUTO:
return new DefaultAWSCredentialsProviderChain();
default:
throw new IllegalArgumentException(
"Credential provider not supported: " + credentialProviderType);
}
}