static Properties forAwsCredentials()

in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java [75:116]


  static Properties forAwsCredentials(AwsCredentials awsCredentials) {
    final Properties properties = new Properties();

    if (awsCredentials.isDefault()) {
      properties.setProperty(
          AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
          AWSConfigConstants.CredentialProvider.AUTO.name());
    } else if (awsCredentials.isBasic()) {
      properties.setProperty(
          AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
          AWSConfigConstants.CredentialProvider.BASIC.name());

      final AwsCredentials.BasicAwsCredentials basicCredentials = awsCredentials.asBasic();
      properties.setProperty(
          AWSConfigConstants.accessKeyId(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
          basicCredentials.accessKeyId());
      properties.setProperty(
          AWSConfigConstants.secretKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
          basicCredentials.secretAccessKey());
    } else if (awsCredentials.isProfile()) {
      properties.setProperty(
          AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
          AWSConfigConstants.CredentialProvider.PROFILE.name());

      final AwsCredentials.ProfileAwsCredentials profileCredentials = awsCredentials.asProfile();
      properties.setProperty(
          AWSConfigConstants.profileName(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
          profileCredentials.name());
      profileCredentials
          .path()
          .ifPresent(
              path ->
                  properties.setProperty(
                      AWSConfigConstants.profilePath(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
                      path));
    } else {
      throw new IllegalStateException(
          "Unrecognized AWS credentials configuration type: " + awsCredentials);
    }

    return properties;
  }