in nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java [705:906]
private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName, final Charset charset) {
if (rawValue == null) {
return null;
}
switch (fieldSchema.getType()) {
case INT: {
final LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType == null) {
return DataTypeUtils.toInteger(rawValue, fieldName);
}
if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
final String format = determineDataType(fieldSchema).getFormat();
final FieldConverter<Object, LocalDate> fieldConverter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(LocalDate.class);
final LocalDate localDate = fieldConverter.convertField(rawValue, Optional.ofNullable(format), fieldName);
return (int) localDate.toEpochDay();
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
final String format = determineDataType(fieldSchema).getFormat();
return getLogicalTimeMillis(rawValue, format, fieldName);
}
return DataTypeUtils.toInteger(rawValue, fieldName);
}
case LONG: {
final LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType == null) {
return DataTypeUtils.toLong(rawValue, fieldName);
}
if (LOGICAL_TYPE_TIME_MICROS.equals(logicalType.getName())) {
final long epochMilli = getLongFromTimestamp(rawValue, fieldSchema, fieldName);
final ZonedDateTime zonedDateTime = Instant.ofEpochMilli(epochMilli).atZone(ZoneId.systemDefault());
final ZonedDateTime midnight = zonedDateTime.truncatedTo(ChronoUnit.DAYS);
final Duration duration = Duration.between(midnight, zonedDateTime);
return duration.toMillis() * ONE_THOUSAND_MILLISECONDS;
} else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) {
return getLongFromTimestamp(rawValue, fieldSchema, fieldName);
} else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) {
return getLongFromTimestamp(rawValue, fieldSchema, fieldName) * ONE_THOUSAND_MILLISECONDS;
}
return DataTypeUtils.toLong(rawValue, fieldName);
}
case BYTES:
case FIXED:
final LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
final BigDecimal rawDecimal;
if (rawValue instanceof BigDecimal) {
rawDecimal = (BigDecimal) rawValue;
} else if (rawValue instanceof Double) {
rawDecimal = BigDecimal.valueOf((Double) rawValue);
} else if (rawValue instanceof String) {
rawDecimal = new BigDecimal((String) rawValue);
} else if (rawValue instanceof Integer) {
rawDecimal = new BigDecimal((Integer) rawValue);
} else if (rawValue instanceof Long) {
rawDecimal = new BigDecimal((Long) rawValue);
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal");
}
// If the desired scale is different than this value's coerce scale.
final int desiredScale = decimalType.getScale();
final BigDecimal decimal = rawDecimal.scale() == desiredScale
? rawDecimal : rawDecimal.setScale(desiredScale, RoundingMode.HALF_UP);
return fieldSchema.getType() == Type.BYTES
? new Conversions.DecimalConversion().toBytes(decimal, fieldSchema, logicalType) //return GenericByte
: new Conversions.DecimalConversion().toFixed(decimal, fieldSchema, logicalType); //return GenericFixed
}
if (rawValue instanceof byte[]) {
return ByteBuffer.wrap((byte[]) rawValue);
}
if (rawValue instanceof String) {
return ByteBuffer.wrap(((String) rawValue).getBytes(charset));
}
if (rawValue instanceof Object[]) {
if (fieldSchema.getType() == Type.FIXED && "INT96".equals(fieldSchema.getName())) {
Object[] rawObjects = (Object[]) rawValue;
byte[] rawBytes = new byte[rawObjects.length];
for (int elementIndex = 0; elementIndex < rawObjects.length; elementIndex++) {
rawBytes[elementIndex] = (Byte) rawObjects[elementIndex];
}
return new GenericData.Fixed(fieldSchema, rawBytes);
} else {
return convertByteArray((Object[]) rawValue);
}
}
try {
if (rawValue instanceof Blob) {
Blob blob = (Blob) rawValue;
return ByteBuffer.wrap(IOUtils.toByteArray(blob.getBinaryStream()));
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
}
} catch (IllegalTypeConversionException itce) {
throw itce;
} catch (Exception e) {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer", e);
}
case MAP:
if (rawValue instanceof Record) {
final Record recordValue = (Record) rawValue;
final Map<String, Object> map = new HashMap<>();
for (final RecordField recordField : recordValue.getSchema().getFields()) {
final Object v = recordValue.getValue(recordField);
if (v != null) {
map.put(recordField.getFieldName(), convertToAvroObject(v, fieldSchema.getValueType(), fieldName + "[" + recordField.getFieldName() + "]", charset));
}
}
return map;
} else if (rawValue instanceof Map) {
final Map<String, Object> objectMap = (Map<String, Object>) rawValue;
final Map<String, Object> map = new HashMap<>(objectMap.size());
for (final String s : objectMap.keySet()) {
final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]", charset);
map.put(s, converted);
}
return map;
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map");
}
case RECORD:
final GenericData.Record avroRecord = new GenericData.Record(fieldSchema);
final Set<Map.Entry<String, Object>> entries;
if (rawValue instanceof Map) {
final Map<String, Object> map = (Map<String, Object>) rawValue;
entries = map.entrySet();
} else if (rawValue instanceof Record) {
entries = new HashSet<>();
final Record record = (Record) rawValue;
record.getSchema().getFields().forEach(field -> entries.add(new AbstractMap.SimpleEntry<>(field.getFieldName(), record.getValue(field))));
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Record");
}
for (final Map.Entry<String, Object> e : entries) {
final Object recordFieldValue = e.getValue();
final String recordFieldName = e.getKey();
final Field field = fieldSchema.getField(recordFieldName);
if (field == null) {
continue;
}
final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName, charset);
avroRecord.put(recordFieldName, converted);
}
return avroRecord;
case UNION:
return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName, charset), fieldName);
case ARRAY:
final Object[] objectArray;
if (rawValue instanceof List) {
objectArray = ((List) rawValue).toArray();
} else if (rawValue instanceof Object[]) {
objectArray = (Object[]) rawValue;
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to an Array");
}
final List<Object> list = new ArrayList<>(objectArray.length);
int i = 0;
for (final Object o : objectArray) {
final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]", charset);
list.add(converted);
i++;
}
return list;
case BOOLEAN:
return DataTypeUtils.toBoolean(rawValue, fieldName);
case DOUBLE:
return DataTypeUtils.toDouble(rawValue, fieldName);
case FLOAT:
return DataTypeUtils.toFloat(rawValue, fieldName);
case NULL:
return null;
case ENUM:
List<String> enums = fieldSchema.getEnumSymbols();
if (enums != null && enums.contains(rawValue)) {
return new GenericData.EnumSymbol(fieldSchema, rawValue);
} else {
throw new IllegalTypeConversionException(rawValue + " is not a possible value of the ENUM" + enums + ".");
}
case STRING:
if (rawValue instanceof String) {
return rawValue;
}
return DataTypeUtils.toString(rawValue, (String) null, charset);
}
return rawValue;
}