private static AWSCredentialsProvider getCredentialsProvider()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java [131:201]


    private static AWSCredentialsProvider getCredentialsProvider(
            final Properties configProps, final String configPrefix) {
        CredentialProvider credentialProviderType =
                AWSClientUtil.getCredentialProviderType(configProps, configPrefix);

        switch (credentialProviderType) {
            case ENV_VAR:
                return new EnvironmentVariableCredentialsProvider();

            case SYS_PROP:
                return new SystemPropertiesCredentialsProvider();

            case PROFILE:
                String profileName =
                        configProps.getProperty(AWSConfigConstants.profileName(configPrefix), null);
                String profileConfigPath =
                        configProps.getProperty(AWSConfigConstants.profilePath(configPrefix), null);
                return (profileConfigPath == null)
                        ? new ProfileCredentialsProvider(profileName)
                        : new ProfileCredentialsProvider(profileConfigPath, profileName);

            case BASIC:
                return new AWSCredentialsProvider() {
                    @Override
                    public AWSCredentials getCredentials() {
                        return new BasicAWSCredentials(
                                configProps.getProperty(
                                        AWSConfigConstants.accessKeyId(configPrefix)),
                                configProps.getProperty(
                                        AWSConfigConstants.secretKey(configPrefix)));
                    }

                    @Override
                    public void refresh() {
                        // do nothing
                    }
                };

            case ASSUME_ROLE:
                return new STSAssumeRoleSessionCredentialsProvider.Builder(
                                configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)),
                                configProps.getProperty(
                                        AWSConfigConstants.roleSessionName(configPrefix)))
                        .withExternalId(
                                configProps.getProperty(
                                        AWSConfigConstants.externalId(configPrefix)))
                        .withStsClient(createStsClient(configProps, configPrefix))
                        .build();

            case WEB_IDENTITY_TOKEN:
                return WebIdentityTokenCredentialsProvider.builder()
                        .roleArn(
                                configProps.getProperty(
                                        AWSConfigConstants.roleArn(configPrefix), null))
                        .roleSessionName(
                                configProps.getProperty(
                                        AWSConfigConstants.roleSessionName(configPrefix), null))
                        .webIdentityTokenFile(
                                configProps.getProperty(
                                        AWSConfigConstants.webIdentityTokenFile(configPrefix),
                                        null))
                        .build();

            case AUTO:
                return new DefaultAWSCredentialsProviderChain();

            default:
                throw new IllegalArgumentException(
                        "Credential provider not supported: " + credentialProviderType);
        }
    }