in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java [200:260]
private <AttributeT> AttributeConverter<AttributeT> getAttributeConverter(
TypeInformation<AttributeT> typeInfo) {
if (typeInfo instanceof BasicTypeInfo) {
return AttributeConverterProvider.defaultProvider()
.converterFor(EnhancedType.of(typeInfo.getTypeClass()));
} else if (typeInfo.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
return getAttributeConverter(
AttributeValueType.B,
bytes ->
bytes instanceof SdkBytes
? AttributeValue.fromB((SdkBytes) bytes)
: AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) bytes)));
} else if (typeInfo instanceof BasicArrayTypeInfo) {
BasicArrayTypeInfo<AttributeT, ?> basicArrayTypeInfo =
(BasicArrayTypeInfo<AttributeT, ?>) typeInfo;
if (basicArrayTypeInfo.getComponentInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) {
return getAttributeConverter(
AttributeValueType.SS,
array -> AttributeValue.fromSs(Arrays.asList((String[]) array)));
} else if (basicArrayTypeInfo.getComponentInfo() instanceof NumericTypeInfo) {
return getAttributeConverter(
AttributeValueType.NS,
array ->
AttributeValue.fromNs(
convertObjectArrayToStringList((Object[]) array)));
}
return new ArrayAttributeConverterProvider()
.converterFor(EnhancedType.of(typeInfo.getTypeClass()));
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
return getObjectArrayTypeConverter((ObjectArrayTypeInfo<AttributeT, ?>) typeInfo);
} else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
PrimitiveArrayTypeInfo<AttributeT> primitiveArrayTypeInfo =
(PrimitiveArrayTypeInfo<AttributeT>) typeInfo;
if (primitiveArrayTypeInfo.getComponentType() instanceof NumericTypeInfo) {
return getAttributeConverter(
AttributeValueType.NS,
array -> AttributeValue.fromNs(convertPrimitiveArrayToStringList(array)));
} else {
throw new IllegalArgumentException(
String.format(
"Unsupported primitive array typeInfo %s",
primitiveArrayTypeInfo.getComponentType()));
}
} else if (typeInfo instanceof TupleTypeInfo<?>) {
return (AttributeConverter<AttributeT>)
getTupleTypeConverter((TupleTypeInfo<?>) typeInfo);
} else if (typeInfo instanceof CompositeType) {
try {
TableSchema<AttributeT> schema =
createTableSchema((CompositeType<AttributeT>) typeInfo);
return getAttributeConverter(
AttributeValueType.M,
o -> AttributeValue.fromM(schema.itemToMap(o, false)));
} catch (IntrospectionException e) {
throw new FlinkRuntimeException("Failed to extract nested table schema", e);
}
} else {
throw new IllegalArgumentException(String.format("Unsupported TypeInfo %s", typeInfo));
}
}