in projects/dataflow-gcs-avro-to-spanner-scd/src/main/java/com/google/cloud/solutions/dataflow/avrotospannerscd/transforms/AvroToStructFn.java [51:189]
private record GenericRecordConverter(GenericRecord record) {
/**
* Converts a GenericRecord to a Struct.
*
* <p>The GenericRecord is converted to a Struct by iterating over the fields and converting
* each of the fields values and schemas from Avro GenericRecord to Spanner Struct.
*
* @return Struct for the GenericRecord with matching data and data types.
*/
private Struct toStruct() {
Struct.Builder structBuilder = Struct.newBuilder();
Schema avroSchema = checkNotNull(record.getSchema(), "Input file Avro Schema is null.");
avroSchema
.getFields()
.forEach(field -> structBuilder.set(field.name()).to(getFieldValue(field)));
return structBuilder.build();
}
private Value getFieldValue(Field field) {
if (field.schema().getLogicalType() != null) {
return getLogicalFieldValue(field);
}
Schema.Type fieldType = field.schema().getType();
Object fieldValue = record.get(field.name());
return switch (fieldType) {
case BOOLEAN ->
Value.bool(fieldValue == null ? NullValues.NULL_BOOLEAN : (Boolean) fieldValue);
case BYTES, FIXED ->
Value.bytes(fieldValue == null ? NullValues.NULL_BYTES : (ByteArray) fieldValue);
case DOUBLE ->
Value.float64(fieldValue == null ? NullValues.NULL_FLOAT64 : (Double) fieldValue);
case FLOAT ->
Value.float32(fieldValue == null ? NullValues.NULL_FLOAT32 : (Float) fieldValue);
case INT ->
Value.int64(
fieldValue == null ? NullValues.NULL_INT64 : Long.valueOf((Integer) fieldValue));
case LONG -> Value.int64(fieldValue == null ? NullValues.NULL_INT64 : (Long) fieldValue);
case STRING ->
Value.string(fieldValue == null ? NullValues.NULL_STRING : fieldValue.toString());
case UNION -> getUnionFieldValue(field);
default ->
throw new UnsupportedOperationException(
String.format("Avro field type %s is not supported.", fieldType));
};
}
private Value getLogicalFieldValue(Field field) {
String logicalTypeName = field.schema().getLogicalType().getName();
Object fieldValue = record.get(field.name());
return switch (logicalTypeName) {
case "date" ->
Value.date(
fieldValue == null
? NullValues.NULL_DATE
: Date.fromJavaUtilDate(
java.util.Date.from(
new TimeConversions.DateConversion()
.fromInt(
(Integer) fieldValue,
field.schema(),
LogicalTypes.fromSchema(field.schema()))
.atStartOfDay()
.atZone(ZoneId.systemDefault())
.toInstant())));
case "decimal" ->
Value.numeric(
fieldValue == null
? NullValues.NULL_NUMERIC
: new Conversions.DecimalConversion()
.fromBytes(
convertToByteBuffer(fieldValue),
field.schema(),
LogicalTypes.fromSchema(field.schema())));
case "local-timestamp-millis", "timestamp-millis" ->
Value.timestamp(
fieldValue == null
? NullValues.NULL_TIMESTAMP
: Timestamp.ofTimeMicroseconds(
new TimeConversions.TimestampMillisConversion()
.fromLong(
(Long) fieldValue,
field.schema(),
LogicalTypes.fromSchema(field.schema()))
.toEpochMilli()
* 1000L));
case "local-timestamp-micros", "timestamp-micros" ->
Value.timestamp(
fieldValue == null
? NullValues.NULL_TIMESTAMP
: Timestamp.ofTimeMicroseconds(
new TimeConversions.TimestampMicrosConversion()
.fromLong(
(Long) fieldValue,
field.schema(),
LogicalTypes.fromSchema(field.schema()))
.toEpochMilli()
* 1000L));
// case "duration", "time-micros", "time-millis", "uuid"
default ->
throw new UnsupportedOperationException(
String.format(
"Avro logical field type %s on column %s is not supported.",
logicalTypeName, field.name()));
};
}
private static ByteBuffer convertToByteBuffer(Object fieldValue) {
return switch (fieldValue) {
case ByteBuffer byteBufferValue -> byteBufferValue;
case ByteArray byteArrayValue -> ByteBuffer.wrap(byteArrayValue.toByteArray());
case byte[] bytes -> ByteBuffer.wrap(bytes);
default ->
throw new UnsupportedOperationException("Unexpected value for decimal: " + fieldValue);
};
}
private Value getUnionFieldValue(Field field) {
List<Schema> unionTypes = field.schema().getTypes();
if (unionTypes.size() != 2) {
throw new UnsupportedOperationException(
String.format("UNION is only supported for nullable fields. Got: %s.", unionTypes));
}
// It is not possible to have UNION of same type (e.g. NULL, NULL).
if (unionTypes.get(0).getType() == Schema.Type.NULL) {
return getFieldValue(new Field(field.name(), unionTypes.get(1), field.doc()));
}
if (unionTypes.get(1).getType() == Schema.Type.NULL) {
return getFieldValue(new Field(field.name(), unionTypes.get(0), field.doc()));
}
throw new UnsupportedOperationException(
String.format("UNION is only supported for nullable fields. Got: %s.", unionTypes));
}
}