in v1/src/main/java/com/google/cloud/teleport/spanner/AvroRecordConverter.java [47:234]
public Mutation apply(GenericRecord record) {
Schema schema = record.getSchema();
List<Schema.Field> fields = schema.getFields();
Mutation.WriteBuilder builder = Mutation.newInsertOrUpdateBuilder(table.name());
for (Schema.Field field : fields) {
String fieldName = field.name();
Column column = table.column(fieldName);
if (column == null) {
throw new IllegalArgumentException(
String.format(
"Cannot find corresponding column for field %s in table %s schema %s",
fieldName, table.prettyPrint(), schema.toString(true)));
}
if (column.isGenerated()) {
// Spanner will compute generated column values automatically.
continue;
}
Schema avroFieldSchema = field.schema();
if (avroFieldSchema.getType() == Schema.Type.UNION) {
Schema unpacked = AvroUtil.unpackNullable(avroFieldSchema);
if (unpacked != null) {
avroFieldSchema = unpacked;
}
}
LogicalType logicalType = LogicalTypes.fromSchema(avroFieldSchema);
Schema.Type avroType = avroFieldSchema.getType();
switch (column.type().getCode()) {
case BOOL:
case PG_BOOL:
builder.set(column.name()).to(readBool(record, avroType, fieldName).orElse(null));
break;
case INT64:
case PG_INT8:
case ENUM:
builder.set(column.name()).to(readInt64(record, avroType, fieldName).orElse(null));
break;
case FLOAT32:
case PG_FLOAT4:
builder.set(column.name()).to(readFloat32(record, avroType, fieldName).orElse(null));
break;
case FLOAT64:
case PG_FLOAT8:
builder.set(column.name()).to(readFloat64(record, avroType, fieldName).orElse(null));
break;
case STRING:
case PG_VARCHAR:
case PG_TEXT:
case JSON:
case PG_JSONB:
case UUID:
case PG_UUID:
builder.set(column.name()).to(readString(record, avroType, fieldName).orElse(null));
break;
case BYTES:
case PG_BYTEA:
case PROTO:
builder.set(column.name()).to(readBytes(record, avroType, fieldName).orElse(null));
break;
case TIMESTAMP:
case PG_TIMESTAMPTZ:
case PG_SPANNER_COMMIT_TIMESTAMP:
builder
.set(column.name())
.to(readTimestamp(record, avroType, logicalType, fieldName).orElse(null));
break;
case DATE:
case PG_DATE:
builder
.set(column.name())
.to(readDate(record, avroType, logicalType, fieldName).orElse(null));
break;
case NUMERIC:
builder.set(column.name()).to(readNumeric(record, avroType, fieldName).orElse(null));
break;
case PG_NUMERIC:
builder
.set(column.name())
.to(Value.pgNumeric(readPgNumeric(record, avroType, fieldName).orElse(null)));
break;
case ARRAY:
case PG_ARRAY:
{
Schema arraySchema = avroFieldSchema.getElementType();
if (arraySchema.getType() == Schema.Type.UNION) {
Schema unpacked = AvroUtil.unpackNullable(arraySchema);
if (unpacked != null) {
arraySchema = unpacked;
}
}
LogicalType arrayLogicalType = LogicalTypes.fromSchema(arraySchema);
Schema.Type arrayType = arraySchema.getType();
switch (column.type().getArrayElementType().getCode()) {
case BOOL:
case PG_BOOL:
builder
.set(column.name())
.toBoolArray(readBoolArray(record, arrayType, fieldName).orElse(null));
break;
case INT64:
case PG_INT8:
case ENUM:
builder
.set(column.name())
.toInt64Array(readInt64Array(record, arrayType, fieldName).orElse(null));
break;
case FLOAT32:
case PG_FLOAT4:
builder
.set(column.name())
.toFloat32Array(readFloat32Array(record, arrayType, fieldName).orElse(null));
break;
case FLOAT64:
case PG_FLOAT8:
builder
.set(column.name())
.toFloat64Array(readFloat64Array(record, arrayType, fieldName).orElse(null));
break;
case STRING:
case PG_VARCHAR:
case PG_TEXT:
case JSON:
case UUID:
case PG_UUID:
builder
.set(column.name())
.toStringArray(readStringArray(record, arrayType, fieldName).orElse(null));
break;
case PG_JSONB:
builder
.set(column.name())
.toPgJsonbArray(readStringArray(record, arrayType, fieldName).orElse(null));
break;
case BYTES:
case PG_BYTEA:
case PROTO:
builder
.set(column.name())
.toBytesArray(readBytesArray(record, arrayType, fieldName).orElse(null));
break;
case TIMESTAMP:
case PG_TIMESTAMPTZ:
case PG_SPANNER_COMMIT_TIMESTAMP:
builder
.set(column.name())
.toTimestampArray(
readTimestampArray(record, arrayType, arrayLogicalType, fieldName)
.orElse(null));
break;
case DATE:
case PG_DATE:
builder
.set(column.name())
.toDateArray(readDateArray(record, arrayType, fieldName).orElse(null));
break;
case NUMERIC:
builder
.set(column.name())
.toStringArray(readNumericArray(record, arrayType, fieldName).orElse(null));
break;
case PG_NUMERIC:
builder
.set(column.name())
.toPgNumericArray(
readPgNumericArray(record, arrayType, fieldName).orElse(null));
break;
default:
throw new IllegalArgumentException(
String.format(
"Cannot convert field %s in schema %s table %s",
fieldName, schema.toString(true), table.prettyPrint()));
}
break;
}
default:
throw new IllegalArgumentException(
String.format(
"Cannot convert field %s in schema %s table %s",
fieldName, schema.toString(true), table.prettyPrint()));
}
}
return builder.build();
}