in java/avro-converter/src/main/java/com/microsoft/azure/schemaregistry/kafka/connect/avro/AvroConverterUtils.java [464:650]
public Schema toConnectSchema(org.apache.avro.Schema schema) {
String type = schema.getProp(CONNECT_TYPE_PROP);
String logicalType = schema.getProp(AVRO_LOGICAL_TYPE_PROP);
final SchemaBuilder builder;
switch (schema.getType()) {
case BOOLEAN:
builder = SchemaBuilder.bool();
break;
case BYTES:
case FIXED:
if (AVRO_LOGICAL_DECIMAL.equalsIgnoreCase(logicalType)) {
Object scaleNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_SCALE_PROP);
int scale = scaleNode instanceof Number ? ((Number) scaleNode).intValue() : 0;
builder = Decimal.builder(scale);
Object precisionNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_PRECISION_PROP);
if (null != precisionNode) {
if (!(precisionNode instanceof Number)) {
throw new DataException(AVRO_LOGICAL_DECIMAL_PRECISION_PROP + " is not an integer.");
}
int percision = ((Number) precisionNode).intValue();
if (percision != CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT) {
builder.parameter(CONNECT_AVRO_DECIMAL_PRECISION_PROP, String.valueOf(percision));
}
}
} else {
builder = SchemaBuilder.bytes();
}
if (schema.getType() == org.apache.avro.Schema.Type.FIXED) {
builder.parameter(CONNECT_AVRO_FIXED_SIZE_PROP, String.valueOf(schema.getFixedSize()));
}
break;
case DOUBLE:
builder = SchemaBuilder.float64();
break;
case FLOAT:
builder = SchemaBuilder.float32();
break;
case INT:
if (type == null && logicalType == null) {
builder = SchemaBuilder.int32();
} else if (logicalType != null) {
if (AVRO_LOGICAL_DATE.equalsIgnoreCase(logicalType)) {
builder = Date.builder();
} else if (AVRO_LOGICAL_TIME_MILLIS.equalsIgnoreCase(logicalType)) {
builder = Time.builder();
} else {
builder = SchemaBuilder.int32();
}
} else {
Schema.Type connectType = NON_AVRO_TYPES_BY_TYPE_CODE.get(type);
if (connectType == null) {
throw new DataException("Connect type for Avro int field is not found");
}
builder = SchemaBuilder.type(connectType);
}
break;
case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
builder = Timestamp.builder();
} else {
builder = SchemaBuilder.int64();
}
break;
case STRING:
builder = SchemaBuilder.string();
break;
case ARRAY:
org.apache.avro.Schema arrayElementSchema = schema.getElementType();
if (isMapEntry(arrayElementSchema)) {
if (arrayElementSchema.getFields().size() != 2
|| arrayElementSchema.getField(KEY_FIELD) == null
|| arrayElementSchema.getField(VALUE_FIELD) == null) {
throw new DataException("Expected array of key-value pairs");
}
builder =
SchemaBuilder.map(toConnectSchema(arrayElementSchema.getField(KEY_FIELD).schema()),
toConnectSchema(arrayElementSchema.getField(VALUE_FIELD).schema()));
} else {
Schema arraySchema = toConnectSchema(schema.getElementType());
builder = SchemaBuilder.array(arraySchema);
}
break;
case MAP:
builder = SchemaBuilder.map(Schema.STRING_SCHEMA, toConnectSchema(schema.getValueType()));
break;
case RECORD: {
builder = SchemaBuilder.struct();
for (org.apache.avro.Schema.Field field : schema.getFields()) {
Schema fieldSchema = toConnectSchema(field.schema());
builder.field(field.name(), fieldSchema);
}
break;
}
case ENUM:
builder = SchemaBuilder.string();
String paramName = AVRO_TYPE_ENUM;
builder.parameter(paramName, schema.getFullName());
for (String enumSymbol : schema.getEnumSymbols()) {
builder.parameter(paramName + "." + enumSymbol, enumSymbol);
}
break;
case UNION: {
if (schema.getTypes().size() == 2) {
if (schema.getTypes().contains(NULL_AVRO_SCHEMA)) {
for (org.apache.avro.Schema memberSchema : schema.getTypes()) {
if (!memberSchema.equals(NULL_AVRO_SCHEMA)) {
return toConnectSchema(memberSchema);
}
}
}
}
String unionName = AVRO_TYPE_UNION;
builder = SchemaBuilder.struct().name(unionName);
Set<String> fieldNames = new HashSet<>();
int fieldIndex = 0;
for (org.apache.avro.Schema memberSchema : schema.getTypes()) {
if (memberSchema.getType() == org.apache.avro.Schema.Type.NULL) {
builder.optional();
} else {
String fieldName = unionMemberFieldName(memberSchema, fieldIndex);
if (fieldNames.contains(fieldName)) {
throw new DataException("Multiple union schemas map to the Connect union field name");
}
fieldNames.add(fieldName);
builder.field(fieldName, toConnectSchema(memberSchema));
}
fieldIndex++;
}
break;
}
case NULL:
throw new DataException("Null schemas are not supported");
default:
throw new DataException("Unsupported schema type: " + schema.getType().getName());
}
Object parameters = schema.getObjectProp(CONNECT_PARAMETERS_PROP);
if (parameters != null) {
if (!(parameters instanceof Map)) {
throw new DataException("Expected JSON object for schema parameters. Got: " + parameters);
}
Map<String, Object> params = (Map<String, Object>) parameters;
for (Map.Entry<String, Object> entry : params.entrySet()) {
Object jsonValue = entry.getValue();
if (!(jsonValue instanceof String)) {
throw new DataException("Schema parameter value is not a string. Got: " + jsonValue);
}
builder.parameter(entry.getKey(), (String) jsonValue);
}
}
for (Map.Entry<String, Object> entry : schema.getObjectProps().entrySet()) {
if (entry.getKey().startsWith(AVRO_PROP)) {
builder.parameter(entry.getKey(), entry.getValue().toString());
}
}
Object connectNameJson = schema.getObjectProp(CONNECT_NAME_PROP);
String name = null;
if (connectNameJson != null) {
if (!(connectNameJson instanceof String)) {
throw new DataException("Invalid schema name: " + connectNameJson.toString());
}
name = (String) connectNameJson;
} else if (schema.getType() == org.apache.avro.Schema.Type.RECORD
|| schema.getType() == org.apache.avro.Schema.Type.ENUM
|| schema.getType() == org.apache.avro.Schema.Type.FIXED) {
name = schema.getFullName();
}
if (name != null && !name.startsWith(DEFAULT_SCHEMA_FULL_NAME)) {
if (builder.name() != null) {
if (!name.equals(builder.name())) {
throw new DataException("Schema name that has already been added to SchemaBuilder ("
+ builder.name() + ") differs from name in source schema (" + name + ")");
}
} else {
builder.name(name);
}
}
return builder.build();
}