in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/binders/AwsCredentialsJsonDeserializer.java [37:66]
public AwsCredentials deserialize(
JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
final ObjectNode awsCredentialsNode = jsonParser.readValueAs(ObjectNode.class);
final String typeString = awsCredentialsNode.get("type").asText();
switch (typeString) {
case DEFAULT_TYPE:
return AwsCredentials.fromDefaultProviderChain();
case BASIC_TYPE:
return AwsCredentials.basic(
awsCredentialsNode.get("accessKeyId").asText(),
awsCredentialsNode.get("secretAccessKey").asText());
case PROFILE_TYPE:
final JsonNode pathNode = awsCredentialsNode.get("profilePath");
if (pathNode != null) {
return AwsCredentials.profile(
awsCredentialsNode.get("profileName").asText(), pathNode.asText());
} else {
return AwsCredentials.profile(awsCredentialsNode.get("profileName").asText());
}
default:
final List<String> validValues = Arrays.asList(DEFAULT_TYPE, BASIC_TYPE, PROFILE_TYPE);
throw new IllegalArgumentException(
"Invalid AWS credential type: "
+ typeString
+ "; valid values are ["
+ String.join(", ", validValues)
+ "]");
}
}