in hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java [377:510]
private static Object convertObject(Schema schema, Object value) {
if (value == null) {
return null;
}
// if we've reached max recursion depth in the provided schema, write out message to bytes
if (RECURSION_OVERFLOW_SCHEMA.getFullName().equals(schema.getFullName())) {
GenericData.Record overflowRecord = new GenericData.Record(schema);
Message messageValue = (Message) value;
overflowRecord.put(OVERFLOW_DESCRIPTOR_FIELD_NAME, messageValue.getDescriptorForType().getFullName());
overflowRecord.put(OVERFLOW_BYTES_FIELD_NAME, ByteBuffer.wrap(messageValue.toByteArray()));
return overflowRecord;
}
switch (schema.getType()) {
case ARRAY:
List<Object> arrayValue = (List<Object>) value;
List<Object> arrayCopy = new GenericData.Array<>(arrayValue.size(), schema);
for (Object obj : arrayValue) {
arrayCopy.add(convertObject(schema.getElementType(), obj));
}
return arrayCopy;
case BYTES:
ByteBuffer byteBufferValue;
if (value instanceof ByteString) {
byteBufferValue = ((ByteString) value).asReadOnlyByteBuffer();
} else if (value instanceof Message) {
byteBufferValue = ((ByteString) getWrappedValue(value)).asReadOnlyByteBuffer();
} else if (value instanceof byte[]) {
byteBufferValue = ByteBuffer.wrap((byte[]) value);
} else {
byteBufferValue = (ByteBuffer) value;
}
int start = byteBufferValue.position();
int length = byteBufferValue.limit() - start;
byte[] bytesCopy = new byte[length];
byteBufferValue.get(bytesCopy, 0, length);
byteBufferValue.position(start);
return ByteBuffer.wrap(bytesCopy, 0, length);
case ENUM:
return GenericData.get().createEnum(value.toString(), schema);
case FIXED:
if (value instanceof byte[]) {
return GenericData.get().createFixed(null, (byte[]) value, schema);
}
Object unsignedLongValue = value;
if (unsignedLongValue instanceof UInt64Value) {
// Unwrap UInt64Value
unsignedLongValue = getWrappedValue(unsignedLongValue);
} else if (unsignedLongValue instanceof Message) {
// Unexpected message type
throw new HoodieException("Unexpected Message type when converting as an unsigned long: " + unsignedLongValue.getClass().getName());
}
// convert the long to its unsigned value
return DECIMAL_CONVERSION.toFixed(new BigDecimal(toUnsignedBigInteger((Long) unsignedLongValue)), schema, schema.getLogicalType());
case BOOLEAN:
case DOUBLE:
case FLOAT:
case INT:
if (value instanceof Message) {
return getWrappedValue(value);
}
return value; // immutable
case LONG:
Object tmpValue = value;
if (value instanceof Message) {
// check if this is a Timestamp
if (LogicalTypes.timestampMicros().equals(schema.getLogicalType())) {
if (value instanceof Timestamp) {
return Timestamps.toMicros((Timestamp) value);
} else if (value instanceof DynamicMessage) {
Timestamp.Builder builder = Timestamp.newBuilder();
((DynamicMessage) value).getAllFields().forEach((fieldDescriptor, fieldValue) -> {
if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.seconds")) {
builder.setSeconds((Long) fieldValue);
} else if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.nanos")) {
builder.setNanos((Integer) fieldValue);
}
});
return Timestamps.toMicros(builder.build());
} else {
throw new HoodieSchemaException("Unexpected message type while handling timestamps: " + value.getClass().getName());
}
} else {
tmpValue = getWrappedValue(value);
}
}
// unsigned ints need to be casted to long
if (tmpValue instanceof Integer) {
tmpValue = new Long((Integer) tmpValue);
}
return tmpValue;
case MAP:
Map<Object, Object> mapValue = (Map) value;
Map<Object, Object> mapCopy = new HashMap<>(mapValue.size());
for (Map.Entry<Object, Object> entry : mapValue.entrySet()) {
mapCopy.put(convertObject(STRING_SCHEMA, entry.getKey()), convertObject(schema.getValueType(), entry.getValue()));
}
return mapCopy;
case NULL:
return null;
case RECORD:
GenericData.Record newRecord = new GenericData.Record(schema);
Message messageValue = (Message) value;
Descriptors.FieldDescriptor[] orderedFields = getOrderedFields(schema, messageValue);
for (Schema.Field field : schema.getFields()) {
int position = field.pos();
Descriptors.FieldDescriptor fieldDescriptor = orderedFields[position];
Object convertedValue;
Schema fieldSchema = field.schema();
// if incoming message does not contain the field, fieldDescriptor will be null
// if the field schema is a union, it is nullable
if (fieldSchema.getType() == Schema.Type.UNION && (fieldDescriptor == null || (!fieldDescriptor.isRepeated() && !messageValue.hasField(fieldDescriptor)))) {
convertedValue = null;
} else {
convertedValue = convertObject(fieldSchema, fieldDescriptor == null ? field.defaultVal() : messageValue.getField(fieldDescriptor));
}
newRecord.put(position, convertedValue);
}
return newRecord;
case STRING:
if (value instanceof String) {
return value;
} else if (value instanceof StringValue) {
return ((StringValue) value).getValue();
} else {
return new Utf8(value.toString());
}
case UNION:
// Unions only occur for nullable fields when working with proto + avro and null is the first schema in the union
return convertObject(schema.getTypes().get(1), value);
default:
throw new HoodieException("Proto to Avro conversion failed for schema \"" + schema + "\" and value \"" + value + "\"");
}
}