in cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/AvroDataUtils.java [108:179]
public static Object toAvro(Object cassandraValue, Schema fieldSchema)
{
if (cassandraValue instanceof Byte)
{
return Integer.valueOf((Byte) cassandraValue);
}
else if (cassandraValue instanceof Short)
{
return Integer.valueOf((Short) cassandraValue);
}
else if (cassandraValue instanceof InetAddress)
{
return ByteBuffer.wrap(((InetAddress) cassandraValue).getAddress());
}
else if (cassandraValue instanceof Date)
{
Date date = (Date) cassandraValue;
return TimeUnit.MILLISECONDS.toMicros(date.getTime());
}
else if (cassandraValue instanceof Map)
{
Map<Object, Object> map = (Map<Object, Object>) cassandraValue;
Schema unwrapped = unwrapNullable(fieldSchema);
if (isRecordBasedUdt(unwrapped))
{
// udt type
GenericData.Record udtEntry = new GenericData.Record(unwrapped);
map.forEach((key, value) -> {
final String fieldName = key.toString(); // UDT key should always be the fieldName of type String
udtEntry.put(fieldName, toAvro(value, unwrapped.getField(fieldName).schema()));
});
return udtEntry;
}
else
{
// map type
return map.entrySet().stream().map(entry -> {
Schema mapEntrySchema = unwrapped.getElementType();
GenericData.Record mapEntry = new GenericData.Record(mapEntrySchema);
mapEntry.put(ARRAY_BASED_MAP_KEY_NAME,
toAvro(entry.getKey(), mapEntrySchema.getField(ARRAY_BASED_MAP_KEY_NAME).schema()));
mapEntry.put(ARRAY_BASED_MAP_VALUE_NAME,
toAvro(entry.getValue(), mapEntrySchema.getField(ARRAY_BASED_MAP_VALUE_NAME).schema()));
return mapEntry;
}).collect(Collectors.toList());
}
}
else if (cassandraValue instanceof Collection) // matches List and Set
{
Schema elementSchema = unwrapNullable(fieldSchema).getElementType();
return ((Collection<?>) cassandraValue).stream()
.map(value -> toAvro(value, elementSchema))
.collect(Collectors.toList());
}
else if (cassandraValue instanceof UUID)
{
return cassandraValue.toString();
}
else if (cassandraValue instanceof BigInteger)
{
Schema schema = unwrapNullable(fieldSchema);
// spark-avro converter set the field type as "FIXED"
return decimalConversions.toFixed(new BigDecimal((BigInteger) cassandraValue), schema, schema.getLogicalType());
}
else if (cassandraValue instanceof BigDecimal)
{
// spark-avro converter set the field type as "FIXED"
Schema schema = unwrapNullable(fieldSchema);
return decimalConversions.toFixed((BigDecimal) cassandraValue, schema, schema.getLogicalType());
}
return cassandraValue;
}