in schema-converter/avro-schema-converter/src/main/java/org/apache/rocketmq/schema/avro/AvroData.java [1421:1703]
private Object toConnectData(Schema schema,
Object value,
ToConnectContext toConnectContext,
boolean doLogicalConversion) {
if (value == null && schema != null && !schema.isOptional()) {
throw new ConnectException("Found null value for non-optional schema");
}
if (value == null) {
return null;
}
try {
// If we're decoding schemaless data, we need to unwrap it into just the single value
if (schema == null) {
if (!(value instanceof IndexedRecord)) {
throw new ConnectException("Invalid Avro data for schemaless Connect data");
}
IndexedRecord recordValue = (IndexedRecord) value;
Object
boolVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BOOLEAN_FIELD).pos());
if (boolVal != null) {
return toConnectData(SchemaBuilder.bool().build(), boolVal, toConnectContext);
}
Object
bytesVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BYTES_FIELD).pos());
if (bytesVal != null) {
return toConnectData(SchemaBuilder.bytes().build(), bytesVal, toConnectContext);
}
Object
dblVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_DOUBLE_FIELD).pos());
if (dblVal != null) {
return toConnectData(SchemaBuilder.float64().build(), dblVal, toConnectContext);
}
Object
fltVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_FLOAT_FIELD).pos());
if (fltVal != null) {
return toConnectData(SchemaBuilder.float32().build(), fltVal, toConnectContext);
}
Object intVal = recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_INT_FIELD).pos());
if (intVal != null) {
return toConnectData(SchemaBuilder.int32().build(), intVal, toConnectContext);
}
Object
longVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_LONG_FIELD).pos());
if (longVal != null) {
return toConnectData(SchemaBuilder.int64().build(), longVal, toConnectContext);
}
Object
stringVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_STRING_FIELD).pos());
if (stringVal != null) {
return toConnectData(SchemaBuilder.string().build(), stringVal, toConnectContext);
}
Object
arrayVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_ARRAY_FIELD).pos());
if (arrayVal != null) {
// We cannot reuse the logic like we do in other cases because it is not possible to
// construct an array schema with a null item schema, but the items have no schema.
if (!(arrayVal instanceof Collection)) {
throw new ConnectException(
"Expected a Collection for schemaless array field but found a "
+ arrayVal.getClass().getName()
);
}
Collection<Object> original = (Collection<Object>) arrayVal;
List<Object> result = new ArrayList<>(original.size());
for (Object elem : original) {
result.add(toConnectData(null, elem, toConnectContext));
}
return result;
}
Object mapVal = recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_MAP_FIELD).pos());
if (mapVal != null) {
// We cannot reuse the logic like we do in other cases because it is not possible to
// construct a map schema with a null item schema, but the items have no schema.
if (!(mapVal instanceof Collection)) {
throw new ConnectException(
"Expected a List for schemaless map field but found a "
+ mapVal.getClass().getName()
);
}
Collection<IndexedRecord> original = (Collection<IndexedRecord>) mapVal;
Map<Object, Object> result = new HashMap<>(original.size());
for (IndexedRecord entry : original) {
int avroKeyFieldIndex = entry.getSchema().getField(KEY_FIELD).pos();
int avroValueFieldIndex = entry.getSchema().getField(VALUE_FIELD).pos();
Object convertedKey = toConnectData(
null, entry.get(avroKeyFieldIndex), toConnectContext);
Object convertedValue = toConnectData(
null, entry.get(avroValueFieldIndex), toConnectContext);
result.put(convertedKey, convertedValue);
}
return result;
}
// If nothing was set, it's null
return null;
}
Object converted = null;
switch (schema.getFieldType()) {
// Pass through types
case INT32: {
Integer intValue = (Integer) value; // Validate type
converted = value;
break;
}
case INT64: {
Long longValue = (Long) value; // Validate type
converted = value;
break;
}
case FLOAT32: {
Float floatValue = (Float) value; // Validate type
converted = value;
break;
}
case FLOAT64: {
Double doubleValue = (Double) value; // Validate type
converted = value;
break;
}
case BOOLEAN: {
Boolean boolValue = (Boolean) value; // Validate type
converted = value;
break;
}
case INT8:
// Encoded as an Integer
converted = ((Integer) value).byteValue();
break;
case INT16:
// Encoded as an Integer
converted = ((Integer) value).shortValue();
break;
case STRING:
if (value instanceof String) {
converted = value;
} else if (value instanceof CharSequence
|| value instanceof GenericEnumSymbol
|| value instanceof Enum) {
converted = value.toString();
} else {
throw new ConnectException("Invalid class for string type, expecting String or "
+ "CharSequence but found " + value.getClass());
}
break;
case BYTES:
if (value instanceof byte[]) {
converted = ByteBuffer.wrap((byte[]) value);
} else if (value instanceof ByteBuffer) {
converted = value;
} else if (value instanceof GenericFixed) {
converted = ByteBuffer.wrap(((GenericFixed) value).bytes());
} else {
throw new ConnectException("Invalid class for bytes type, expecting byte[] or ByteBuffer "
+ "but found " + value.getClass());
}
break;
case ARRAY: {
Schema valueSchema = schema.getValueSchema();
Collection<Object> original = (Collection<Object>) value;
List<Object> result = new ArrayList<>(original.size());
for (Object elem : original) {
result.add(toConnectData(valueSchema, elem, toConnectContext));
}
converted = result;
break;
}
case MAP: {
Schema keySchema = schema.getKeySchema();
Schema valueSchema = schema.getValueSchema();
if (keySchema != null && keySchema.getFieldType() == FieldType.STRING && !keySchema.isOptional()) {
// Non-optional string keys
Map<CharSequence, Object> original = (Map<CharSequence, Object>) value;
Map<CharSequence, Object> result = new HashMap<>(original.size());
for (Map.Entry<CharSequence, Object> entry : original.entrySet()) {
result.put(entry.getKey().toString(),
toConnectData(valueSchema, entry.getValue(), toConnectContext));
}
converted = result;
} else {
// Arbitrary keys
Collection<IndexedRecord> original = (Collection<IndexedRecord>) value;
Map<Object, Object> result = new HashMap<>(original.size());
for (IndexedRecord entry : original) {
int avroKeyFieldIndex = entry.getSchema().getField(KEY_FIELD).pos();
int avroValueFieldIndex = entry.getSchema().getField(VALUE_FIELD).pos();
Object convertedKey = toConnectData(
keySchema, entry.get(avroKeyFieldIndex), toConnectContext);
Object convertedValue = toConnectData(
valueSchema, entry.get(avroValueFieldIndex), toConnectContext);
result.put(convertedKey, convertedValue);
}
converted = result;
}
break;
}
case STRUCT: {
// Special case support for union types
if (schema.getName() != null && schema.getName().equals(AVRO_TYPE_UNION)) {
Schema valueRecordSchema = null;
if (value instanceof IndexedRecord) {
IndexedRecord valueRecord = (IndexedRecord) value;
valueRecordSchema = toConnectSchemaWithCycles(
valueRecord.getSchema(), true, null, null, toConnectContext);
}
for (Field field : schema.getFields()) {
Schema fieldSchema = field.getSchema();
if (isInstanceOfAvroSchemaTypeForSimpleSchema(
fieldSchema, value, enhancedSchemaSupport)
|| (valueRecordSchema != null && schemaEquals(valueRecordSchema, fieldSchema))) {
converted = new Struct(schema).put(
unionMemberFieldName(fieldSchema, enhancedSchemaSupport),
toConnectData(fieldSchema, value, toConnectContext));
break;
}
}
if (converted == null) {
throw new ConnectException(
"Did not find matching union field for data: " + value);
}
} else if (value instanceof Map) {
// Default values from Avro are returned as Map
Map<CharSequence, Object> original = (Map<CharSequence, Object>) value;
Struct result = new Struct(schema);
for (Field field : schema.getFields()) {
Object convertedFieldValue =
toConnectData(field.getSchema(), original.get(field.getName()), toConnectContext);
result.put(field, convertedFieldValue);
}
return result;
} else {
IndexedRecord original = (IndexedRecord) value;
Struct result = new Struct(schema);
for (Field field : schema.getFields()) {
int avroFieldIndex = original.getSchema().getField(field.getName()).pos();
Object convertedFieldValue =
toConnectData(field.getSchema(), original.get(avroFieldIndex), toConnectContext);
result.put(field, convertedFieldValue);
}
converted = result;
}
break;
}
default:
throw new ConnectException("Unknown Connect schema type: " + schema.getFieldType());
}
if (schema.getName() != null && doLogicalConversion) {
LogicalTypeConverter logicalConverter = TO_CONNECT_LOGICAL_CONVERTERS.get(schema.getName());
if (logicalConverter != null) {
converted = logicalConverter.convert(schema, converted);
}
}
return converted;
} catch (ClassCastException e) {
String schemaType = schema != null ? schema.getName() : "null";
throw new ConnectException("Invalid type for " + schemaType + ": " + value.getClass());
}
}