in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/utils/BigQueryAvroUtils.java [298:412]
private static Object convertRequiredField(
Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
// Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
// is required, so it may not be null.
String bqType = fieldSchema.getType();
ImmutableCollection<Type> expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType);
verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
verify(
expectedAvroTypes.contains(avroType),
"Expected Avro schema types %s for BigQuery %s field %s, but received %s",
expectedAvroTypes,
bqType,
fieldSchema.getName(),
avroType);
// For historical reasons, don't validate avroLogicalType except for with NUMERIC.
// BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type.
switch (bqType) {
case "STRING":
// Avro will use a CharSequence to represent String objects, but it may not always use
// java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
verify(
v instanceof CharSequence || v instanceof EnumSymbol,
"Expected CharSequence (String) or EnumSymbol, got %s",
v.getClass());
return v.toString();
case "DATETIME":
case "GEOGRAPHY":
case "JSON":
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
return v.toString();
case "DATE":
if (avroType == Type.INT) {
verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());
verifyNotNull(avroLogicalType, "Expected Date logical type");
verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type");
return formatDate((Integer) v);
} else {
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
return v.toString();
}
case "TIME":
if (avroType == Type.LONG) {
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
verify(
avroLogicalType instanceof LogicalTypes.TimeMicros,
"Expected TimeMicros logical type");
return formatTime((Long) v);
} else {
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
return v.toString();
}
case "INTEGER":
case "INT64":
if (avroType == Type.INT) {
verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());
return ((Long) ((Integer) v).longValue()).toString();
} else {
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
return ((Long) v).toString();
}
case "FLOAT64":
if (avroType == Type.INT) {
verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());
return (Double) ((Integer) v).doubleValue();
} else if (avroType == Type.LONG) {
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
return (Double) ((Long) v).doubleValue();
} else if (avroType == Type.FLOAT) {
verify(v instanceof Float, "Expected Float, got %s", v.getClass());
return (Double) ((Float) v).doubleValue();
} else {
verify(v instanceof Double, "Expected Double, got %s", v.getClass());
return v;
}
case "NUMERIC":
case "BIGNUMERIC":
// NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are
// converted back to Strings with precision and scale determined by the logical type.
verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
verifyNotNull(avroLogicalType, "Expected Decimal logical type");
verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type");
BigDecimal numericValue =
new Conversions.DecimalConversion()
.fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType);
return numericValue.toString();
case "BOOL":
case "BOOLEAN":
verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
return v;
case "TIMESTAMP":
// TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch.
// Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC.
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
return formatTimestamp((Long) v);
case "RECORD":
case "STRUCT":
verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
case "BYTES":
verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
ByteBuffer byteBuffer = (ByteBuffer) v;
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
return BaseEncoding.base64().encode(bytes);
default:
throw new UnsupportedOperationException(
String.format(
"Unexpected BigQuery field schema type %s for field named %s",
fieldSchema.getType(), fieldSchema.getName()));
}
}