in schema-converter/avro-schema-converter/src/main/java/org/apache/rocketmq/schema/avro/AvroData.java [306:564]
private static Object fromConnectData(
Schema schema,
org.apache.avro.Schema avroSchema,
Object data,
boolean requireContainer,
boolean requireSchemalessContainerNull,
boolean enhancedSchemaSupport
) {
FieldType schemaType = schema != null
? schema.getFieldType()
: schemaTypeForSchemalessJavaType(data);
if (schemaType == null && requireSchemalessContainerNull) {
if (requireSchemalessContainerNull) {
return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
} else {
return null;
}
}
if (data == null && schema != null && !schema.isOptional()) {
throw new ConnectException("Found null value for non-optional schema");
}
if (data == null) {
if (schema == null && requireSchemalessContainerNull) {
return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
} else {
return null;
}
}
Object value = data;
if (schema != null && schema.getName() != null) {
// Convert logical type data
LogicalTypeConverter logicalConverter = TO_AVRO_LOGICAL_CONVERTERS.get(schema.getName());
if (logicalConverter != null) {
value = logicalConverter.convert(schema, value);
}
}
try {
switch (schemaType) {
case INT8: {
Byte byteValue = (Byte) value;
Integer convertedByteValue = byteValue == null ? null : byteValue.intValue();
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, convertedByteValue, ANYTHING_SCHEMA_INT_FIELD),
requireContainer);
}
case INT16: {
Short shortValue = (Short) value;
Integer convertedShortValue = shortValue == null ? null : shortValue.intValue();
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, convertedShortValue, ANYTHING_SCHEMA_INT_FIELD),
requireContainer);
}
case INT32:
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_INT_FIELD),
requireContainer);
case INT64:
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_LONG_FIELD),
requireContainer);
case FLOAT32:
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_FLOAT_FIELD),
requireContainer);
case FLOAT64:
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_DOUBLE_FIELD),
requireContainer);
case BOOLEAN:
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_BOOLEAN_FIELD),
requireContainer);
case STRING:
if (enhancedSchemaSupport && schema != null && schema.getParameters() != null && schema.getParameters().containsKey(AVRO_TYPE_ENUM)) {
String enumSchemaName = schema.getParameters().get(AVRO_TYPE_ENUM);
org.apache.avro.Schema enumSchema;
if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
int enumIndex = avroSchema.getIndexNamed(enumSchemaName);
enumSchema = avroSchema.getTypes().get(enumIndex);
} else {
enumSchema = avroSchema;
}
value = new GenericData.EnumSymbol(enumSchema, (String) value);
}
// string or enum
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_STRING_FIELD),
requireContainer);
case BYTES: {
value = value instanceof byte[]
? ByteBuffer.wrap((byte[]) value)
: (ByteBuffer) value;
if (schema != null && isFixedSchema(schema)) {
//Get fixed size
int size = Integer.parseInt(schema.getParameters().get(CONNECT_AVRO_FIXED_SIZE));
org.apache.avro.Schema fixedSchema = null;
if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
// union
for (org.apache.avro.Schema memberSchema : avroSchema.getTypes()) {
if (memberSchema.getType() == org.apache.avro.Schema.Type.FIXED
&& memberSchema.getFixedSize() == size
&& unionMemberFieldName(memberSchema, enhancedSchemaSupport)
.equals(schema.getName())) {
fixedSchema = memberSchema;
}
}
if (fixedSchema == null) {
throw new ConnectException("Fixed size " + size + " not in union " + avroSchema);
}
} else {
fixedSchema = avroSchema;
}
value = new GenericData.Fixed(fixedSchema, ((ByteBuffer) value).array());
}
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_BYTES_FIELD),
requireContainer);
}
case ARRAY: {
Collection<Object> list = (Collection<Object>) value;
List<Object> converted = new ArrayList<>(list.size());
Schema elementSchema = schema != null ? schema.getValueSchema() : null;
org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(schema, avroSchema);
org.apache.avro.Schema elementAvroSchema = schema != null ? underlyingAvroSchema.getElementType() : ANYTHING_SCHEMA;
for (Object val : list) {
converted.add(
fromConnectData(
elementSchema,
elementAvroSchema,
val,
false,
true,
enhancedSchemaSupport
)
);
}
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_ARRAY_FIELD),
requireContainer);
}
case MAP: {
Map<Object, Object> map = (Map<Object, Object>) value;
org.apache.avro.Schema underlyingAvroSchema;
if (schema != null
&& schema.getKeySchema().getFieldType() == FieldType.STRING
&& !schema.getKeySchema().isOptional()) {
// TODO most types don't need a new converted object since types pass through
underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(schema, avroSchema);
Map<String, Object> converted = new HashMap<>();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
// Key is a String, no conversion needed
Object convertedValue = fromConnectData(
schema.getValueSchema(),
underlyingAvroSchema.getValueType(),
entry.getValue(),
false,
true,
enhancedSchemaSupport
);
converted.put((String) entry.getKey(), convertedValue);
}
return maybeAddContainer(avroSchema, converted, requireContainer);
} else {
List<GenericRecord> converted = new ArrayList<>(map.size());
underlyingAvroSchema = avroSchemaForUnderlyingMapEntryType(schema, avroSchema);
org.apache.avro.Schema elementSchema =
schema != null
? underlyingAvroSchema.getElementType()
: ANYTHING_SCHEMA_MAP_ELEMENT;
org.apache.avro.Schema avroKeySchema = elementSchema.getField(KEY_FIELD).schema();
org.apache.avro.Schema avroValueSchema = elementSchema.getField(VALUE_FIELD).schema();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
Object keyConverted = fromConnectData(schema != null ? schema.getKeySchema() : null,
avroKeySchema, entry.getKey(), false, true,
enhancedSchemaSupport);
Object valueConverted = fromConnectData(schema != null ? schema.getValueSchema() : null,
avroValueSchema, entry.getValue(), false,
true, enhancedSchemaSupport);
converted.add(
new GenericRecordBuilder(elementSchema)
.set(KEY_FIELD, keyConverted)
.set(VALUE_FIELD, valueConverted)
.build()
);
}
return maybeAddContainer(
avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD),
requireContainer);
}
}
case STRUCT: {
Struct struct = (Struct) value;
if (!struct.schema().equals(schema)) {
throw new ConnectException("Mismatching struct schema");
}
//This handles the inverting of a union which is held as a struct, where each field is
// one of the union types.
if (AVRO_TYPE_UNION.equals(schema.getName())) {
for (Field field : schema.getFields()) {
Object object = struct.get(field);
if (object != null) {
return fromConnectData(
field.getSchema(),
avroSchema,
object,
false,
true,
enhancedSchemaSupport
);
}
}
return fromConnectData(schema, avroSchema, null, false, true, enhancedSchemaSupport);
} else {
org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(
schema, avroSchema);
GenericRecordBuilder convertedBuilder = new GenericRecordBuilder(underlyingAvroSchema);
for (Field field : schema.getFields()) {
org.apache.avro.Schema.Field theField = underlyingAvroSchema.getField(field.getName());
org.apache.avro.Schema fieldAvroSchema = theField.schema();
convertedBuilder.set(
field.getName(),
fromConnectData(field.getSchema(), fieldAvroSchema, struct.get(field), false,
true, enhancedSchemaSupport)
);
}
return convertedBuilder.build();
}
}
default:
throw new ConnectException("Unknown schema type: " + schema.getFieldType());
}
} catch (ClassCastException e) {
throw new ConnectException("Invalid type for " + schema.getFieldType() + ": " + value.getClass());
}
}