public AwsCredentials deserialize()

in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/binders/AwsCredentialsJsonDeserializer.java [37:66]


  public AwsCredentials deserialize(
      JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
    final ObjectNode awsCredentialsNode = jsonParser.readValueAs(ObjectNode.class);
    final String typeString = awsCredentialsNode.get("type").asText();

    switch (typeString) {
      case DEFAULT_TYPE:
        return AwsCredentials.fromDefaultProviderChain();
      case BASIC_TYPE:
        return AwsCredentials.basic(
            awsCredentialsNode.get("accessKeyId").asText(),
            awsCredentialsNode.get("secretAccessKey").asText());
      case PROFILE_TYPE:
        final JsonNode pathNode = awsCredentialsNode.get("profilePath");
        if (pathNode != null) {
          return AwsCredentials.profile(
              awsCredentialsNode.get("profileName").asText(), pathNode.asText());
        } else {
          return AwsCredentials.profile(awsCredentialsNode.get("profileName").asText());
        }
      default:
        final List<String> validValues = Arrays.asList(DEFAULT_TYPE, BASIC_TYPE, PROFILE_TYPE);
        throw new IllegalArgumentException(
            "Invalid AWS credential type: "
                + typeString
                + "; valid values are ["
                + String.join(", ", validValues)
                + "]");
    }
  }