private AttributeConverter getAttributeConverter()

in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java [200:260]


    private <AttributeT> AttributeConverter<AttributeT> getAttributeConverter(
            TypeInformation<AttributeT> typeInfo) {
        if (typeInfo instanceof BasicTypeInfo) {
            return AttributeConverterProvider.defaultProvider()
                    .converterFor(EnhancedType.of(typeInfo.getTypeClass()));
        } else if (typeInfo.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
            return getAttributeConverter(
                    AttributeValueType.B,
                    bytes ->
                            bytes instanceof SdkBytes
                                    ? AttributeValue.fromB((SdkBytes) bytes)
                                    : AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) bytes)));
        } else if (typeInfo instanceof BasicArrayTypeInfo) {
            BasicArrayTypeInfo<AttributeT, ?> basicArrayTypeInfo =
                    (BasicArrayTypeInfo<AttributeT, ?>) typeInfo;
            if (basicArrayTypeInfo.getComponentInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) {
                return getAttributeConverter(
                        AttributeValueType.SS,
                        array -> AttributeValue.fromSs(Arrays.asList((String[]) array)));
            } else if (basicArrayTypeInfo.getComponentInfo() instanceof NumericTypeInfo) {
                return getAttributeConverter(
                        AttributeValueType.NS,
                        array ->
                                AttributeValue.fromNs(
                                        convertObjectArrayToStringList((Object[]) array)));
            }

            return new ArrayAttributeConverterProvider()
                    .converterFor(EnhancedType.of(typeInfo.getTypeClass()));
        } else if (typeInfo instanceof ObjectArrayTypeInfo) {
            return getObjectArrayTypeConverter((ObjectArrayTypeInfo<AttributeT, ?>) typeInfo);
        } else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
            PrimitiveArrayTypeInfo<AttributeT> primitiveArrayTypeInfo =
                    (PrimitiveArrayTypeInfo<AttributeT>) typeInfo;
            if (primitiveArrayTypeInfo.getComponentType() instanceof NumericTypeInfo) {
                return getAttributeConverter(
                        AttributeValueType.NS,
                        array -> AttributeValue.fromNs(convertPrimitiveArrayToStringList(array)));
            } else {
                throw new IllegalArgumentException(
                        String.format(
                                "Unsupported primitive array typeInfo %s",
                                primitiveArrayTypeInfo.getComponentType()));
            }
        } else if (typeInfo instanceof TupleTypeInfo<?>) {
            return (AttributeConverter<AttributeT>)
                    getTupleTypeConverter((TupleTypeInfo<?>) typeInfo);
        } else if (typeInfo instanceof CompositeType) {
            try {
                TableSchema<AttributeT> schema =
                        createTableSchema((CompositeType<AttributeT>) typeInfo);
                return getAttributeConverter(
                        AttributeValueType.M,
                        o -> AttributeValue.fromM(schema.itemToMap(o, false)));
            } catch (IntrospectionException e) {
                throw new FlinkRuntimeException("Failed to extract nested table schema", e);
            }
        } else {
            throw new IllegalArgumentException(String.format("Unsupported TypeInfo %s", typeInfo));
        }
    }