private record GenericRecordConverter()

in projects/dataflow-gcs-avro-to-spanner-scd/src/main/java/com/google/cloud/solutions/dataflow/avrotospannerscd/transforms/AvroToStructFn.java [51:189]


  private record GenericRecordConverter(GenericRecord record) {

    /**
     * Converts a GenericRecord to a Struct.
     *
     * <p>The GenericRecord is converted to a Struct by iterating over the fields and converting
     * each of the fields values and schemas from Avro GenericRecord to Spanner Struct.
     *
     * @return Struct for the GenericRecord with matching data and data types.
     */
    private Struct toStruct() {
      Struct.Builder structBuilder = Struct.newBuilder();
      Schema avroSchema = checkNotNull(record.getSchema(), "Input file Avro Schema is null.");
      avroSchema
          .getFields()
          .forEach(field -> structBuilder.set(field.name()).to(getFieldValue(field)));
      return structBuilder.build();
    }

    private Value getFieldValue(Field field) {
      if (field.schema().getLogicalType() != null) {
        return getLogicalFieldValue(field);
      }

      Schema.Type fieldType = field.schema().getType();
      Object fieldValue = record.get(field.name());

      return switch (fieldType) {
        case BOOLEAN ->
            Value.bool(fieldValue == null ? NullValues.NULL_BOOLEAN : (Boolean) fieldValue);
        case BYTES, FIXED ->
            Value.bytes(fieldValue == null ? NullValues.NULL_BYTES : (ByteArray) fieldValue);
        case DOUBLE ->
            Value.float64(fieldValue == null ? NullValues.NULL_FLOAT64 : (Double) fieldValue);
        case FLOAT ->
            Value.float32(fieldValue == null ? NullValues.NULL_FLOAT32 : (Float) fieldValue);
        case INT ->
            Value.int64(
                fieldValue == null ? NullValues.NULL_INT64 : Long.valueOf((Integer) fieldValue));
        case LONG -> Value.int64(fieldValue == null ? NullValues.NULL_INT64 : (Long) fieldValue);
        case STRING ->
            Value.string(fieldValue == null ? NullValues.NULL_STRING : fieldValue.toString());
        case UNION -> getUnionFieldValue(field);
        default ->
            throw new UnsupportedOperationException(
                String.format("Avro field type %s is not supported.", fieldType));
      };
    }

    private Value getLogicalFieldValue(Field field) {
      String logicalTypeName = field.schema().getLogicalType().getName();
      Object fieldValue = record.get(field.name());

      return switch (logicalTypeName) {
        case "date" ->
            Value.date(
                fieldValue == null
                    ? NullValues.NULL_DATE
                    : Date.fromJavaUtilDate(
                        java.util.Date.from(
                            new TimeConversions.DateConversion()
                                .fromInt(
                                    (Integer) fieldValue,
                                    field.schema(),
                                    LogicalTypes.fromSchema(field.schema()))
                                .atStartOfDay()
                                .atZone(ZoneId.systemDefault())
                                .toInstant())));
        case "decimal" ->
            Value.numeric(
                fieldValue == null
                    ? NullValues.NULL_NUMERIC
                    : new Conversions.DecimalConversion()
                        .fromBytes(
                            convertToByteBuffer(fieldValue),
                            field.schema(),
                            LogicalTypes.fromSchema(field.schema())));
        case "local-timestamp-millis", "timestamp-millis" ->
            Value.timestamp(
                fieldValue == null
                    ? NullValues.NULL_TIMESTAMP
                    : Timestamp.ofTimeMicroseconds(
                        new TimeConversions.TimestampMillisConversion()
                                .fromLong(
                                    (Long) fieldValue,
                                    field.schema(),
                                    LogicalTypes.fromSchema(field.schema()))
                                .toEpochMilli()
                            * 1000L));
        case "local-timestamp-micros", "timestamp-micros" ->
            Value.timestamp(
                fieldValue == null
                    ? NullValues.NULL_TIMESTAMP
                    : Timestamp.ofTimeMicroseconds(
                        new TimeConversions.TimestampMicrosConversion()
                                .fromLong(
                                    (Long) fieldValue,
                                    field.schema(),
                                    LogicalTypes.fromSchema(field.schema()))
                                .toEpochMilli()
                            * 1000L));
        // case "duration", "time-micros", "time-millis", "uuid"
        default ->
            throw new UnsupportedOperationException(
                String.format(
                    "Avro logical field type %s on column %s is not supported.",
                    logicalTypeName, field.name()));
      };
    }

    private static ByteBuffer convertToByteBuffer(Object fieldValue) {
      return switch (fieldValue) {
        case ByteBuffer byteBufferValue -> byteBufferValue;
        case ByteArray byteArrayValue -> ByteBuffer.wrap(byteArrayValue.toByteArray());
        case byte[] bytes -> ByteBuffer.wrap(bytes);
        default ->
            throw new UnsupportedOperationException("Unexpected value for decimal: " + fieldValue);
      };
    }

    private Value getUnionFieldValue(Field field) {
      List<Schema> unionTypes = field.schema().getTypes();
      if (unionTypes.size() != 2) {
        throw new UnsupportedOperationException(
            String.format("UNION is only supported for nullable fields. Got: %s.", unionTypes));
      }

      // It is not possible to have UNION of same type (e.g. NULL, NULL).
      if (unionTypes.get(0).getType() == Schema.Type.NULL) {
        return getFieldValue(new Field(field.name(), unionTypes.get(1), field.doc()));
      }
      if (unionTypes.get(1).getType() == Schema.Type.NULL) {
        return getFieldValue(new Field(field.name(), unionTypes.get(0), field.doc()));
      }

      throw new UnsupportedOperationException(
          String.format("UNION is only supported for nullable fields. Got: %s.", unionTypes));
    }
  }