in cassandra-four-zero-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/CqlToAvroSchemaConverterImplementation.java [181:261]
private static Schema schemaFrom(AbstractType<?> columnCqlType, String namespace)
{
// If a type is reversed we should convert the base type
AbstractType<?> cqlType = columnCqlType.isReversed() ? ((ReversedType<?>) columnCqlType).baseType : columnCqlType;
Schema result;
// If it's a collection
if (cqlType instanceof ListType)
{
String collectionNamespace = namespace + '.' + cqlType.asCQL3Type().toString();
result = SchemaBuilder.array()
.items(schemaFrom(((ListType<?>) cqlType).getElementsType(), collectionNamespace));
}
else if (cqlType instanceof SetType)
{
String collectionNamespace = namespace + '.' + cqlType.asCQL3Type().toString();
result = SchemaBuilder.array()
.items(schemaFrom(((SetType<?>) cqlType).getElementsType(), collectionNamespace));
AvroSchemas.flagArrayAsSet(result);
}
else if (cqlType instanceof MapType)
{
String arrayBasedMapName = AvroConstants.ARRAY_BASED_MAP_NAME;
String childNamespace = namespace + '.' + arrayBasedMapName;
// create a new avro schema, array of records, to simulate a map.
SchemaBuilder.FieldAssembler<Schema> keyValue = SchemaBuilder.builder()
.record(arrayBasedMapName)
.namespace(childNamespace)
.fields();
MapType<?, ?> cqlMap = (MapType<?, ?>) cqlType;
// map keys and values are not nullable
keyValue.name(ARRAY_BASED_MAP_KEY_NAME)
.type(schemaFrom(cqlMap.getKeysType(), childNamespace))
.noDefault();
keyValue.name(ARRAY_BASED_MAP_VALUE_NAME)
.type(schemaFrom(cqlMap.getValuesType(), childNamespace))
.noDefault();
result = SchemaBuilder.builder()
.array()
.items(keyValue.endRecord());
AvroSchemas.flagArrayAsMap(result);
}
else if (cqlType instanceof UserType)
{
UserType userType = (UserType) cqlType;
String recordBasedUdt = AvroConstants.RECORD_BASED_UDT_NAME;
String childNamespace = namespace + '.' + recordBasedUdt;
Schema[] ar = userType.fieldTypes().stream().map(udtType -> schemaFrom(udtType, childNamespace)).toArray(Schema[]::new);
// create a new avro schema, a single record, to represent the udt.
SchemaBuilder.FieldAssembler<Schema> udtValue = SchemaBuilder.builder()
.record(recordBasedUdt)
.namespace(childNamespace)
.fields();
for (int i = 0; i < ar.length; i++)
{
udtValue.name(userType.fieldNameAsString(i))
.type(ar[i])
.noDefault();
}
result = udtValue.endRecord();
AvroSchemas.flagAsUdt(result);
}
else
{
result = convertLiteralType(cqlType, namespace);
}
AvroSchemas.flagCqlType(result, cqlType.asCQL3Type().toString());
if (isFrozen(cqlType))
{
AvroSchemas.flagFrozen(result);
}
if (columnCqlType.isReversed())
{
AvroSchemas.flagReversed(result);
}
return result;
}