private static Object convertRequiredField()

in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/utils/BigQueryAvroUtils.java [298:412]


  private static Object convertRequiredField(
      Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) {
    // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
    // INTEGER type maps to an Avro LONG type.
    checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
    // is required, so it may not be null.
    String bqType = fieldSchema.getType();
    ImmutableCollection<Type> expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType);
    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
    verify(
        expectedAvroTypes.contains(avroType),
        "Expected Avro schema types %s for BigQuery %s field %s, but received %s",
        expectedAvroTypes,
        bqType,
        fieldSchema.getName(),
        avroType);
    // For historical reasons, don't validate avroLogicalType except for with NUMERIC.
    // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type.
    switch (bqType) {
      case "STRING":
        // Avro will use a CharSequence to represent String objects, but it may not always use
        // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
        verify(
            v instanceof CharSequence || v instanceof EnumSymbol,
            "Expected CharSequence (String) or EnumSymbol, got %s",
            v.getClass());
        return v.toString();
      case "DATETIME":
      case "GEOGRAPHY":
      case "JSON":
        verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
        return v.toString();
      case "DATE":
        if (avroType == Type.INT) {
          verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());
          verifyNotNull(avroLogicalType, "Expected Date logical type");
          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type");
          return formatDate((Integer) v);
        } else {
          verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
          return v.toString();
        }
      case "TIME":
        if (avroType == Type.LONG) {
          verify(v instanceof Long, "Expected Long, got %s", v.getClass());
          verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
          verify(
              avroLogicalType instanceof LogicalTypes.TimeMicros,
              "Expected TimeMicros logical type");
          return formatTime((Long) v);
        } else {
          verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
          return v.toString();
        }
      case "INTEGER":
      case "INT64":
        if (avroType == Type.INT) {
          verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());
          return ((Long) ((Integer) v).longValue()).toString();
        } else {
          verify(v instanceof Long, "Expected Long, got %s", v.getClass());
          return ((Long) v).toString();
        }
      case "FLOAT64":
        if (avroType == Type.INT) {
          verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());
          return (Double) ((Integer) v).doubleValue();
        } else if (avroType == Type.LONG) {
          verify(v instanceof Long, "Expected Long, got %s", v.getClass());
          return (Double) ((Long) v).doubleValue();
        } else if (avroType == Type.FLOAT) {
          verify(v instanceof Float, "Expected Float, got %s", v.getClass());
          return (Double) ((Float) v).doubleValue();
        } else {
          verify(v instanceof Double, "Expected Double, got %s", v.getClass());
          return v;
        }
      case "NUMERIC":
      case "BIGNUMERIC":
        // NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are
        // converted back to Strings with precision and scale determined by the logical type.
        verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
        verifyNotNull(avroLogicalType, "Expected Decimal logical type");
        verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type");
        BigDecimal numericValue =
            new Conversions.DecimalConversion()
                .fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType);
        return numericValue.toString();
      case "BOOL":
      case "BOOLEAN":
        verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
        return v;
      case "TIMESTAMP":
        // TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch.
        // Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC.
        verify(v instanceof Long, "Expected Long, got %s", v.getClass());
        return formatTimestamp((Long) v);
      case "RECORD":
      case "STRUCT":
        verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
        return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
      case "BYTES":
        verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
        ByteBuffer byteBuffer = (ByteBuffer) v;
        byte[] bytes = new byte[byteBuffer.limit()];
        byteBuffer.get(bytes);
        return BaseEncoding.base64().encode(bytes);
      default:
        throw new UnsupportedOperationException(
            String.format(
                "Unexpected BigQuery field schema type %s for field named %s",
                fieldSchema.getType(), fieldSchema.getName()));
    }
  }