in schema-converter/avro-schema-converter/src/main/java/org/apache/rocketmq/schema/avro/AvroData.java [1752:2044]
private Schema toConnectSchema(org.apache.avro.Schema schema,
boolean forceOptional,
Object fieldDefaultVal,
String docDefaultVal,
Integer version,
ToConnectContext toConnectContext) {
String type = schema.getProp(CONNECT_TYPE);
String logicalType = schema.getProp(AVRO_LOGICAL_TYPE_PROP);
final SchemaBuilder builder;
switch (schema.getType()) {
case BOOLEAN:
builder = SchemaBuilder.bool();
break;
case BYTES:
case FIXED:
if (AVRO_LOGICAL_DECIMAL.equalsIgnoreCase(logicalType)) {
Object scaleNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_SCALE_PROP);
if (null == scaleNode || !(scaleNode instanceof Number)) {
throw new ConnectException("scale must be specified and must be a number.");
}
Number scale = (Number) scaleNode;
builder = Decimal.builder(scale.intValue());
Object precisionNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_PRECISION_PROP);
if (null != precisionNode) {
if (!(precisionNode instanceof Number)) {
throw new ConnectException(AVRO_LOGICAL_DECIMAL_PRECISION_PROP
+ " property must be a JSON Integer."
+ " https://avro.apache.org/docs/1.9.1/spec.html#Decimal");
}
// Capture the precision as a parameter only if it is not the default
Integer precision = ((Number) precisionNode).intValue();
if (precision != CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT) {
builder.parameter(CONNECT_AVRO_DECIMAL_PRECISION_PROP, precision.toString());
}
}
} else {
builder = SchemaBuilder.bytes();
}
if (schema.getType() == org.apache.avro.Schema.Type.FIXED) {
builder.parameter(CONNECT_AVRO_FIXED_SIZE, String.valueOf(schema.getFixedSize()));
}
break;
case DOUBLE:
builder = SchemaBuilder.float64();
break;
case FLOAT:
builder = SchemaBuilder.float32();
break;
case INT:
// INT is used for Connect's INT8, INT16, and INT32
if (type == null && logicalType == null) {
builder = SchemaBuilder.int32();
} else if (logicalType != null) {
if (AVRO_LOGICAL_DATE.equalsIgnoreCase(logicalType)) {
builder = Date.builder();
} else if (AVRO_LOGICAL_TIME_MILLIS.equalsIgnoreCase(logicalType)) {
builder = Time.builder();
} else {
builder = SchemaBuilder.int32();
}
} else {
FieldType connectType = NON_AVRO_TYPES_BY_TYPE_CODE.get(type);
if (connectType == null) {
throw new ConnectException("Connect type annotation for Avro int field is null");
}
builder = new SchemaBuilder(connectType);
}
break;
case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
builder = Timestamp.builder();
} else {
builder = SchemaBuilder.int64();
}
break;
case STRING:
builder = SchemaBuilder.string();
break;
case ARRAY:
org.apache.avro.Schema elemSchema = schema.getElementType();
// Special case for custom encoding of non-string maps as list of key-value records
if (isMapEntry(elemSchema)) {
if (elemSchema.getFields().size() != 2
|| elemSchema.getField(KEY_FIELD) == null
|| elemSchema.getField(VALUE_FIELD) == null) {
throw new ConnectException("Found map encoded as array of key-value pairs, but array "
+ "elements do not match the expected format.");
}
builder = SchemaBuilder.map(
toConnectSchema(elemSchema.getField(KEY_FIELD).schema()),
toConnectSchema(elemSchema.getField(VALUE_FIELD).schema())
);
} else {
Schema arraySchema = toConnectSchemaWithCycles(
schema.getElementType(), getForceOptionalDefault(),
null, null, toConnectContext);
builder = SchemaBuilder.array(arraySchema);
}
break;
case MAP:
builder = SchemaBuilder.map(
SchemaBuilder.string().build(),
toConnectSchemaWithCycles(
schema.getValueType(),
getForceOptionalDefault(),
null,
null,
toConnectContext
)
);
break;
case RECORD: {
builder = SchemaBuilder.struct();
toConnectContext.cycleReferences.put(schema, new CyclicSchemaWrapper(builder.build()));
if (connectMetaData && schema.getDoc() != null) {
builder.parameter(AVRO_RECORD_DOC, schema.getDoc());
}
for (org.apache.avro.Schema.Field field : schema.getFields()) {
if (connectMetaData && field.doc() != null) {
builder.parameter(AVRO_FIELD_DOC_PREFIX + field.name(), field.doc());
}
Schema fieldSchema = toConnectSchema(field.schema(), getForceOptionalDefault(),
field.defaultVal(), field.doc(), toConnectContext);
builder.field(field.name(), fieldSchema);
}
break;
}
case ENUM:
// enums are unwrapped to strings and the original enum is not preserved
builder = SchemaBuilder.string();
if (connectMetaData) {
if (schema.getDoc() != null) {
builder.parameter(AVRO_ENUM_DOC_PREFIX + schema.getName(), schema.getDoc());
}
if (schema.getEnumDefault() != null) {
builder.parameter(AVRO_ENUM_DEFAULT_PREFIX + schema.getName(),
schema.getEnumDefault());
}
}
builder.parameter(AVRO_TYPE_ENUM, schema.getFullName());
for (String enumSymbol : schema.getEnumSymbols()) {
builder.parameter(AVRO_TYPE_ENUM + "." + enumSymbol, enumSymbol);
}
break;
case UNION: {
if (schema.getTypes().size() == 2) {
if (schema.getTypes().contains(NULL_SCHEMA)) {
for (org.apache.avro.Schema memberSchema : schema.getTypes()) {
if (!memberSchema.equals(NULL_SCHEMA)) {
return toConnectSchemaWithCycles(
memberSchema, true, null, docDefaultVal, toConnectContext);
}
}
}
}
builder = SchemaBuilder.struct().name(AVRO_TYPE_UNION);
Set<String> fieldNames = new HashSet<>();
for (org.apache.avro.Schema memberSchema : schema.getTypes()) {
if (memberSchema.getType() == org.apache.avro.Schema.Type.NULL) {
builder.optional();
} else {
String fieldName = unionMemberFieldName(memberSchema, enhancedSchemaSupport);
if (fieldNames.contains(fieldName)) {
throw new ConnectException("Multiple union schemas map to the Connect union field name");
}
fieldNames.add(fieldName);
builder.field(
fieldName,
toConnectSchemaWithCycles(memberSchema, true, null, null, toConnectContext)
);
}
}
break;
}
case NULL:
throw new ConnectException("Standalone null schemas are not supported by this converter");
default:
throw new ConnectException("Couldn't translate unsupported schema type "
+ schema.getType().getName() + ".");
}
String docVal = schema.getProp(CONNECT_DOC);
if (connectMetaData && docVal != null) {
builder.doc(docVal);
}
// A valid version must be a positive integer (assumed throughout SR)
int versionInt = -1;
Object versionNode = schema.getObjectProp(CONNECT_VERSION);
if (versionNode != null) {
if (!(versionNode instanceof Number)) {
throw new ConnectException("Invalid Connect version found: " + versionNode);
}
versionInt = ((Number) versionNode).intValue();
} else if (version != null) {
versionInt = version.intValue();
}
if (versionInt >= 0) {
if (builder.build().getVersion() != null) {
if (versionInt != builder.build().getVersion()) {
throw new ConnectException("Mismatched versions: version already added to SchemaBuilder "
+ "("
+ builder.build().getVersion()
+ ") differs from version in source schema ("
+ versionInt
+ ")");
}
} else {
builder.version(versionInt);
}
}
Object parameters = schema.getObjectProp(CONNECT_PARAMETERS);
if (connectMetaData && parameters != null) {
if (!(parameters instanceof Map)) {
throw new ConnectException("Expected JSON object for schema parameters but found: "
+ parameters);
}
Iterator<Map.Entry<String, Object>> paramIt =
((Map<String, Object>) parameters).entrySet().iterator();
while (paramIt.hasNext()) {
Map.Entry<String, Object> field = paramIt.next();
Object jsonValue = field.getValue();
if (!(jsonValue instanceof String)) {
throw new ConnectException("Expected schema parameter values to be strings but found: "
+ jsonValue);
}
builder.parameter(field.getKey(), (String) jsonValue);
}
}
for (Map.Entry<String, Object> entry : schema.getObjectProps().entrySet()) {
if (entry.getKey().startsWith(AVRO_PROP)) {
builder.parameter(entry.getKey(), entry.getValue().toString());
}
}
Object connectDefault = schema.getObjectProp(CONNECT_DEFAULT_VALUE);
if (fieldDefaultVal == null) {
fieldDefaultVal = JacksonUtils.toJsonNode(connectDefault);
} else if (connectDefault == null) {
builder.parameter(AVRO_FIELD_DEFAULT_FLAG, "true");
}
if (fieldDefaultVal != null) {
builder.defaultValue(
defaultValueFromAvro(builder.build(), schema, fieldDefaultVal, toConnectContext));
}
Object connectNameJson = schema.getObjectProp(CONNECT_NAME);
String name = null;
if (connectNameJson != null) {
if (!(connectNameJson instanceof String)) {
throw new ConnectException("Invalid schema name: " + connectNameJson);
}
name = (String) connectNameJson;
} else if (schema.getType() == org.apache.avro.Schema.Type.RECORD
|| schema.getType() == org.apache.avro.Schema.Type.ENUM
|| schema.getType() == org.apache.avro.Schema.Type.FIXED) {
name = schema.getFullName();
}
if (name != null && !name.equals(DEFAULT_SCHEMA_FULL_NAME)) {
if (builder.build().getName() != null) {
if (!name.equals(builder.build().getName())) {
throw new ConnectException("Mismatched names: name already added to SchemaBuilder ("
+ builder.build().getName()
+ ") differs from name in source schema ("
+ name + ")");
}
} else {
builder.name(name);
}
}
if (forceOptional) {
builder.optional();
}
if (!toConnectContext.detectedCycles.contains(schema)) {
toConnectContext.cycleReferences.remove(schema);
}
return builder.build();
}