in java/avro-converter/src/main/java/com/microsoft/azure/schemaregistry/kafka/connect/avro/AvroConverterUtils.java [174:457]
private Object toConnectValue(Schema schema, Object value) {
if (value == null || value == JsonProperties.NULL_VALUE) {
return null;
}
try {
if (schema == null) {
if (!(value instanceof IndexedRecord)) {
throw new DataException("Invalid Avro data for schemaless Connect data");
}
IndexedRecord recordValue = (IndexedRecord) value;
Object boolVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BOOLEAN_FIELD).pos());
if (boolVal != null) {
return toConnectValue(Schema.BOOLEAN_SCHEMA, boolVal);
}
Object bytesVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BYTES_FIELD).pos());
if (bytesVal != null) {
return toConnectValue(Schema.BYTES_SCHEMA, bytesVal);
}
Object doubleVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_DOUBLE_FIELD).pos());
if (doubleVal != null) {
return toConnectValue(Schema.FLOAT64_SCHEMA, doubleVal);
}
Object floatVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_FLOAT_FIELD).pos());
if (floatVal != null) {
return toConnectValue(Schema.FLOAT32_SCHEMA, floatVal);
}
Object intVal = recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_INT_FIELD).pos());
if (intVal != null) {
return toConnectValue(Schema.INT32_SCHEMA, intVal);
}
Object longVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_LONG_FIELD).pos());
if (longVal != null) {
return toConnectValue(Schema.INT64_SCHEMA, longVal);
}
Object stringVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_STRING_FIELD).pos());
if (stringVal != null) {
return toConnectValue(Schema.STRING_SCHEMA, stringVal);
}
Object arrayVal =
recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_ARRAY_FIELD).pos());
if (arrayVal != null) {
if (!(arrayVal instanceof Collection)) {
throw new DataException("Expected Collection for schemaless array field but found "
+ arrayVal.getClass().getName());
}
Collection<Object> arrayValCollection = (Collection<Object>) arrayVal;
List<Object> result = new ArrayList<>(arrayValCollection.size());
for (Object arrayValue : arrayValCollection) {
result.add(toConnectValue((Schema) null, arrayValue));
}
return result;
}
Object mapVal = recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_MAP_FIELD).pos());
if (mapVal != null) {
if (!(mapVal instanceof Collection)) {
throw new DataException(
"Expected List for schemaless map field but found " + mapVal.getClass().getName());
}
Collection<IndexedRecord> mapValueCollection = (Collection<IndexedRecord>) mapVal;
Map<Object, Object> result = new HashMap<>(mapValueCollection.size());
for (IndexedRecord mapValue : mapValueCollection) {
int avroKeyFieldIndex = mapValue.getSchema().getField(KEY_FIELD).pos();
int avroValueFieldIndex = mapValue.getSchema().getField(VALUE_FIELD).pos();
Object convertedKey = toConnectValue((Schema) null, mapValue.get(avroKeyFieldIndex));
Object convertedValue =
toConnectValue((Schema) null, mapValue.get(avroValueFieldIndex));
result.put(convertedKey, convertedValue);
}
return result;
}
return null;
}
Object converted = null;
switch (schema.type()) {
case INT32: {
Integer intValue = (Integer) value;
converted = intValue;
break;
}
case INT64: {
Long longValue = (Long) value;
converted = longValue;
break;
}
case FLOAT32: {
Float floatValue = (Float) value;
converted = floatValue;
break;
}
case FLOAT64: {
Double doubleValue = (Double) value;
converted = doubleValue;
break;
}
case BOOLEAN: {
Boolean boolValue = (Boolean) value;
converted = boolValue;
break;
}
case INT8:
converted = ((Integer) value).byteValue();
break;
case INT16:
converted = ((Integer) value).shortValue();
break;
case STRING:
if (value instanceof String) {
converted = value;
} else if (value instanceof CharSequence || value instanceof Enum) {
converted = value.toString();
} else {
throw new DataException(
"Invalid class for string type, expecting String or CharSequence, got "
+ value.getClass());
}
break;
case BYTES:
if (value instanceof byte[]) {
converted = ByteBuffer.wrap((byte[]) value);
} else if (value instanceof ByteBuffer) {
converted = value;
} else if (value instanceof GenericFixed) {
converted = ByteBuffer.wrap(((GenericFixed) value).bytes());
} else {
throw new DataException(
"Invalid class for bytes type, expecting byte[] or ByteBuffer, got "
+ value.getClass());
}
break;
case ARRAY: {
Schema valueSchema = schema.valueSchema();
Collection<Object> valueSchemaCollection = (Collection<Object>) value;
List<Object> arrayValue = new ArrayList<>(valueSchemaCollection.size());
for (Object elem : valueSchemaCollection) {
arrayValue.add(toConnectValue(valueSchema, elem));
}
converted = arrayValue;
break;
}
case MAP: {
Schema keySchema = schema.keySchema();
Schema valueSchema = schema.valueSchema();
if (keySchema != null && keySchema.type() == Schema.Type.STRING
&& !keySchema.isOptional()) {
Map<CharSequence, Object> valueSchemaMap = (Map<CharSequence, Object>) value;
Map<CharSequence, Object> mapValue = new HashMap<>(valueSchemaMap.size());
for (Map.Entry<CharSequence, Object> entry : valueSchemaMap.entrySet()) {
mapValue.put(entry.getKey().toString(),
toConnectValue(valueSchema, entry.getValue()));
}
converted = mapValue;
} else {
Collection<IndexedRecord> original = (Collection<IndexedRecord>) value;
Map<Object, Object> mapValue = new HashMap<>(original.size());
for (IndexedRecord entry : original) {
int avroKeyFieldIndex = entry.getSchema().getField(KEY_FIELD).pos();
int avroValueFieldIndex = entry.getSchema().getField(VALUE_FIELD).pos();
Object convertedKey = toConnectValue(keySchema, entry.get(avroKeyFieldIndex));
Object convertedValue = toConnectValue(valueSchema, entry.get(avroValueFieldIndex));
mapValue.put(convertedKey, convertedValue);
}
converted = mapValue;
}
break;
}
case STRUCT: {
if (AVRO_TYPE_UNION.equals(schema.name())) {
Schema valueRecordSchema = null;
if (value instanceof IndexedRecord) {
IndexedRecord valueRecord = ((IndexedRecord) value);
valueRecordSchema = toConnectSchema(valueRecord.getSchema());
}
int index = 0;
for (Field field : schema.fields()) {
Schema fieldSchema = field.schema();
if (isInstanceOfAvroSchemaTypeForSimpleSchema(fieldSchema, value, index)
|| (valueRecordSchema != null && schemaEquals(valueRecordSchema, fieldSchema))) {
converted = new Struct(schema).put(unionMemberFieldName(fieldSchema, index),
toConnectValue(fieldSchema, value));
break;
}
index++;
}
if (converted == null) {
throw new DataException("Did not find matching union field for data");
}
} else if (value instanceof Map) {
Map<CharSequence, Object> original = (Map<CharSequence, Object>) value;
Struct result = new Struct(schema);
for (Field field : schema.fields()) {
String fieldName = field.name();
Object convertedFieldValue = toConnectValue(field.schema(),
original.getOrDefault(fieldName, field.schema().defaultValue()));
result.put(field, convertedFieldValue);
}
return result;
} else {
IndexedRecord valueRecord = (IndexedRecord) value;
Struct structValue = new Struct(schema);
for (Field field : schema.fields()) {
String fieldName = field.name();
int avroFieldIndex = valueRecord.getSchema().getField(fieldName).pos();
Object convertedFieldValue =
toConnectValue(field.schema(), valueRecord.get(avroFieldIndex));
structValue.put(field, convertedFieldValue);
}
converted = structValue;
}
break;
}
default:
throw new DataException("Unknown Connect schema type: " + schema.type());
}
if (schema.name() != null) {
String schemaNameLower = schema.name().toLowerCase();
switch (schemaNameLower) {
case "org.apache.kafka.connect.data.date":
if (!(value instanceof Integer)) {
throw new DataException(
"Invalid type for Time, underlying representation should be int32 but was "
+ value.getClass());
}
converted = Date.toLogical(schema, (int) value);
break;
case "org.apache.kafka.connect.data.time":
if (!(value instanceof Integer)) {
throw new DataException(
"Invalid type for Time, underlying representation should be int32 but was "
+ value.getClass());
}
converted = Time.toLogical(schema, (int) value);
break;
case "org.apache.kafka.connect.data.timestamp":
if (!(value instanceof Long)) {
throw new DataException(
"Invalid type for Timestamp, underlying representation should be int64 but was "
+ value.getClass());
}
converted = Timestamp.toLogical(schema, (long) value);
break;
case "org.apache.kafka.connect.data.decimal":
if (value instanceof byte[]) {
converted = Decimal.toLogical(schema, (byte[]) value);
} else if (value instanceof ByteBuffer) {
converted = Decimal.toLogical(schema, ((ByteBuffer) value).array());
} else {
throw new DataException(
"Invalid type for Decimal, underlying representation should be bytes but was "
+ value.getClass());
}
break;
default:
break;
}
}
return converted;
} catch (ClassCastException e) {
String schemaType = schema != null ? schema.type().toString() : "null";
throw new DataException("Invalid type for " + schemaType + ": " + value.getClass());
}
}