private static Object convertToAvroObject()

in nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java [705:906]


    private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName, final Charset charset) {
        if (rawValue == null) {
            return null;
        }

        switch (fieldSchema.getType()) {
            case INT: {
                final LogicalType logicalType = fieldSchema.getLogicalType();
                if (logicalType == null) {
                    return DataTypeUtils.toInteger(rawValue, fieldName);
                }


                if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
                    final String format = determineDataType(fieldSchema).getFormat();
                    final FieldConverter<Object, LocalDate> fieldConverter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(LocalDate.class);
                    final LocalDate localDate = fieldConverter.convertField(rawValue, Optional.ofNullable(format), fieldName);
                    return (int) localDate.toEpochDay();
                } else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
                    final String format = determineDataType(fieldSchema).getFormat();
                    return getLogicalTimeMillis(rawValue, format, fieldName);
                }

                return DataTypeUtils.toInteger(rawValue, fieldName);
            }
            case LONG: {
                final LogicalType logicalType = fieldSchema.getLogicalType();
                if (logicalType == null) {
                    return DataTypeUtils.toLong(rawValue, fieldName);
                }

                if (LOGICAL_TYPE_TIME_MICROS.equals(logicalType.getName())) {
                    final long epochMilli = getLongFromTimestamp(rawValue, fieldSchema, fieldName);
                    final ZonedDateTime zonedDateTime = Instant.ofEpochMilli(epochMilli).atZone(ZoneId.systemDefault());
                    final ZonedDateTime midnight = zonedDateTime.truncatedTo(ChronoUnit.DAYS);
                    final Duration duration = Duration.between(midnight, zonedDateTime);
                    return duration.toMillis() * ONE_THOUSAND_MILLISECONDS;
                } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) {
                    return getLongFromTimestamp(rawValue, fieldSchema, fieldName);
                } else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) {
                    return getLongFromTimestamp(rawValue, fieldSchema, fieldName) * ONE_THOUSAND_MILLISECONDS;
                }

                return DataTypeUtils.toLong(rawValue, fieldName);
            }
            case BYTES:
            case FIXED:
                final LogicalType logicalType = fieldSchema.getLogicalType();
                if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
                    final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
                    final BigDecimal rawDecimal;
                    if (rawValue instanceof BigDecimal) {
                        rawDecimal = (BigDecimal) rawValue;

                    } else if (rawValue instanceof Double) {
                        rawDecimal = BigDecimal.valueOf((Double) rawValue);

                    } else if (rawValue instanceof String) {
                        rawDecimal = new BigDecimal((String) rawValue);

                    } else if (rawValue instanceof Integer) {
                        rawDecimal = new BigDecimal((Integer) rawValue);

                    } else if (rawValue instanceof Long) {
                        rawDecimal = new BigDecimal((Long) rawValue);

                    } else {
                        throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal");
                    }
                    // If the desired scale is different than this value's coerce scale.
                    final int desiredScale = decimalType.getScale();
                    final BigDecimal decimal = rawDecimal.scale() == desiredScale
                        ? rawDecimal : rawDecimal.setScale(desiredScale, RoundingMode.HALF_UP);
                    return fieldSchema.getType() == Type.BYTES
                        ? new Conversions.DecimalConversion().toBytes(decimal, fieldSchema, logicalType) //return GenericByte
                        : new Conversions.DecimalConversion().toFixed(decimal, fieldSchema, logicalType); //return GenericFixed
                }
                if (rawValue instanceof byte[]) {
                    return ByteBuffer.wrap((byte[]) rawValue);
                }
                if (rawValue instanceof String) {
                    return ByteBuffer.wrap(((String) rawValue).getBytes(charset));
                }
                if (rawValue instanceof Object[]) {
                    if (fieldSchema.getType() == Type.FIXED && "INT96".equals(fieldSchema.getName())) {
                        Object[] rawObjects = (Object[]) rawValue;
                        byte[] rawBytes = new byte[rawObjects.length];
                        for (int elementIndex = 0; elementIndex < rawObjects.length; elementIndex++) {
                            rawBytes[elementIndex] = (Byte) rawObjects[elementIndex];
                        }

                        return new GenericData.Fixed(fieldSchema, rawBytes);
                    } else {
                        return convertByteArray((Object[]) rawValue);
                    }
                }
                try {
                    if (rawValue instanceof Blob) {
                        Blob blob = (Blob) rawValue;
                        return ByteBuffer.wrap(IOUtils.toByteArray(blob.getBinaryStream()));
                    } else {
                        throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
                    }
                } catch (IllegalTypeConversionException itce) {
                    throw itce;
                } catch (Exception e) {
                    throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer", e);
                }
            case MAP:
                if (rawValue instanceof Record) {
                    final Record recordValue = (Record) rawValue;
                    final Map<String, Object> map = new HashMap<>();
                    for (final RecordField recordField : recordValue.getSchema().getFields()) {
                        final Object v = recordValue.getValue(recordField);
                        if (v != null) {
                            map.put(recordField.getFieldName(), convertToAvroObject(v, fieldSchema.getValueType(), fieldName + "[" + recordField.getFieldName() + "]", charset));
                        }
                    }

                    return map;
                } else if (rawValue instanceof Map) {
                    final Map<String, Object> objectMap = (Map<String, Object>) rawValue;
                    final Map<String, Object> map = new HashMap<>(objectMap.size());
                    for (final String s : objectMap.keySet()) {
                        final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]", charset);
                        map.put(s, converted);
                    }
                    return map;
                } else {
                    throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map");
                }
            case RECORD:
                final GenericData.Record avroRecord = new GenericData.Record(fieldSchema);

                final Set<Map.Entry<String, Object>> entries;
                if (rawValue instanceof Map) {
                    final Map<String, Object> map = (Map<String, Object>) rawValue;
                    entries = map.entrySet();
                } else if (rawValue instanceof Record) {
                    entries = new HashSet<>();
                    final Record record = (Record) rawValue;
                    record.getSchema().getFields().forEach(field -> entries.add(new AbstractMap.SimpleEntry<>(field.getFieldName(), record.getValue(field))));
                } else {
                    throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Record");
                }
                for (final Map.Entry<String, Object> e : entries) {
                    final Object recordFieldValue = e.getValue();
                    final String recordFieldName = e.getKey();

                    final Field field = fieldSchema.getField(recordFieldName);
                    if (field == null) {
                        continue;
                    }

                    final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName, charset);
                    avroRecord.put(recordFieldName, converted);
                }
                return avroRecord;
            case UNION:
                return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName, charset), fieldName);
            case ARRAY:
                final Object[] objectArray;
                if (rawValue instanceof List) {
                    objectArray = ((List) rawValue).toArray();
                } else if (rawValue instanceof Object[]) {
                    objectArray = (Object[]) rawValue;
                } else {
                    throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to an Array");
                }
                final List<Object> list = new ArrayList<>(objectArray.length);
                int i = 0;
                for (final Object o : objectArray) {
                    final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]", charset);
                    list.add(converted);
                    i++;
                }
                return list;
            case BOOLEAN:
                return DataTypeUtils.toBoolean(rawValue, fieldName);
            case DOUBLE:
                return DataTypeUtils.toDouble(rawValue, fieldName);
            case FLOAT:
                return DataTypeUtils.toFloat(rawValue, fieldName);
            case NULL:
                return null;
            case ENUM:
                List<String> enums = fieldSchema.getEnumSymbols();
                if (enums != null && enums.contains(rawValue)) {
                    return new GenericData.EnumSymbol(fieldSchema, rawValue);
                } else {
                    throw new IllegalTypeConversionException(rawValue + " is not a possible value of the ENUM" + enums + ".");
                }
            case STRING:
                if (rawValue instanceof String) {
                    return rawValue;
                }

                return DataTypeUtils.toString(rawValue, (String) null, charset);
        }

        return rawValue;
    }