public static Object toAvro()

in cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/AvroDataUtils.java [108:179]


    public static Object toAvro(Object cassandraValue, Schema fieldSchema)
    {
        if (cassandraValue instanceof Byte)
        {
            return Integer.valueOf((Byte) cassandraValue);
        }
        else if (cassandraValue instanceof Short)
        {
            return Integer.valueOf((Short) cassandraValue);
        }
        else if (cassandraValue instanceof InetAddress)
        {
            return ByteBuffer.wrap(((InetAddress) cassandraValue).getAddress());
        }
        else if (cassandraValue instanceof Date)
        {
            Date date = (Date) cassandraValue;
            return TimeUnit.MILLISECONDS.toMicros(date.getTime());
        }
        else if (cassandraValue instanceof Map)
        {
            Map<Object, Object> map = (Map<Object, Object>) cassandraValue;
            Schema unwrapped = unwrapNullable(fieldSchema);
            if (isRecordBasedUdt(unwrapped))
            {
                // udt type
                GenericData.Record udtEntry = new GenericData.Record(unwrapped);
                map.forEach((key, value) -> {
                    final String fieldName = key.toString(); // UDT key should always be the fieldName of type String
                    udtEntry.put(fieldName, toAvro(value, unwrapped.getField(fieldName).schema()));
                });
                return udtEntry;
            }
            else
            {
                // map type
                return map.entrySet().stream().map(entry -> {
                    Schema mapEntrySchema = unwrapped.getElementType();
                    GenericData.Record mapEntry = new GenericData.Record(mapEntrySchema);
                    mapEntry.put(ARRAY_BASED_MAP_KEY_NAME,
                                 toAvro(entry.getKey(), mapEntrySchema.getField(ARRAY_BASED_MAP_KEY_NAME).schema()));
                    mapEntry.put(ARRAY_BASED_MAP_VALUE_NAME,
                                 toAvro(entry.getValue(), mapEntrySchema.getField(ARRAY_BASED_MAP_VALUE_NAME).schema()));
                    return mapEntry;
                }).collect(Collectors.toList());
            }
        }
        else if (cassandraValue instanceof Collection) // matches List and Set
        {
            Schema elementSchema = unwrapNullable(fieldSchema).getElementType();
            return ((Collection<?>) cassandraValue).stream()
                                                   .map(value -> toAvro(value, elementSchema))
                                                   .collect(Collectors.toList());
        }
        else if (cassandraValue instanceof UUID)
        {
            return cassandraValue.toString();
        }
        else if (cassandraValue instanceof BigInteger)
        {
            Schema schema = unwrapNullable(fieldSchema);
            // spark-avro converter set the field type as "FIXED"
            return decimalConversions.toFixed(new BigDecimal((BigInteger) cassandraValue), schema, schema.getLogicalType());
        }
        else if (cassandraValue instanceof BigDecimal)
        {
            // spark-avro converter set the field type as "FIXED"
            Schema schema = unwrapNullable(fieldSchema);
            return decimalConversions.toFixed((BigDecimal) cassandraValue, schema, schema.getLogicalType());
        }
        return cassandraValue;
    }