in v1/src/main/java/com/google/cloud/teleport/spanner/SpannerRecordConverter.java [166:414]
public GenericRecord convert(Struct row) {
synchronized (this.fields) {
if (!fieldsColumnIndicesInitialized) {
this.fields.stream()
.forEach(
fieldInfo -> {
if (!fieldInfo.generated) {
fieldInfo.setColumnIndex(row);
}
});
fieldsColumnIndicesInitialized = true;
}
}
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
for (FieldInfo fieldInfo : this.fields) {
if (fieldInfo.isGenerated()) {
// Generated column values are not exported.
continue;
}
fieldInfo.checkSupported();
Schema.Field field = fieldInfo.getField();
String fieldName = fieldInfo.getName();
Schema type = fieldInfo.getType();
String spannerType = fieldInfo.getSpannerType();
int fieldIndex = fieldInfo.getColumnIndex();
boolean nullValue = row.isNull(fieldIndex);
if (nullValue && !fieldInfo.isNullable()) {
throw new IllegalArgumentException("Unexpected null value for field " + fieldName);
}
switch (type.getType()) {
case BOOLEAN:
builder.set(field, nullValue ? null : row.getBoolean(fieldIndex));
break;
case LONG:
if ((dialect == Dialect.GOOGLE_STANDARD_SQL && spannerType.equals("TIMESTAMP"))
|| (dialect == Dialect.POSTGRESQL
&& (spannerType.equals("timestamp with time zone")
|| spannerType.equals("spanner.commit_timestamp")))) {
long microSeconds = 0L;
if (!nullValue) {
Timestamp ts = row.getTimestamp(fieldIndex);
microSeconds =
TimeUnit.SECONDS.toMicros(ts.getSeconds())
+ TimeUnit.NANOSECONDS.toMicros(ts.getNanos());
}
builder.set(field, nullValue ? null : microSeconds);
} else {
builder.set(field, nullValue ? null : row.getLong(fieldIndex));
}
break;
case FLOAT:
builder.set(field, nullValue ? null : row.getFloat(fieldIndex));
break;
case DOUBLE:
builder.set(field, nullValue ? null : row.getDouble(fieldIndex));
break;
case BYTES:
if (dialect == Dialect.GOOGLE_STANDARD_SQL && spannerType.equals("NUMERIC")) {
// TODO: uses row.getNumeric() once teleport uses new spanner library.
builder.set(
field,
nullValue
? null
: ByteBuffer.wrap(
NumericUtils.stringToBytes(row.getBigDecimal(fieldIndex).toString())));
break;
}
if (dialect == Dialect.POSTGRESQL && spannerType.equals("numeric")) {
builder.set(
field,
nullValue
? null
: ByteBuffer.wrap(NumericUtils.pgStringToBytes(row.getString(fieldIndex))));
break;
}
builder.set(
field, nullValue ? null : ByteBuffer.wrap(row.getBytes(fieldIndex).toByteArray()));
break;
case STRING:
if (dialect == Dialect.GOOGLE_STANDARD_SQL) {
if (fieldInfo.matchesStringPattern() || spannerType.equals("JSON")) {
builder.set(field, nullValue ? null : row.getString(fieldIndex));
} else if (spannerType.equals("TIMESTAMP")) {
builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
} else if (spannerType.equals("DATE")) {
builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
} else if (spannerType.equals("UUID")) {
builder.set(field, nullValue ? null : row.getString(fieldIndex));
}
} else if (dialect == Dialect.POSTGRESQL) {
if (spannerType.equals("jsonb")) {
builder.set(field, nullValue ? null : row.getPgJsonb(fieldIndex));
} else if (fieldInfo.matchesVarcharPattern() || spannerType.equals("text")) {
builder.set(field, nullValue ? null : row.getString(fieldIndex));
} else if (spannerType.equals("timestamp with time zone")) {
builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
} else if (spannerType.equals("spanner.commit_timestamp")) {
builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
} else if (spannerType.equals("date")) {
builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
} else if (spannerType.equals("uuid")) {
builder.set(field, nullValue ? null : row.getString(fieldIndex));
}
}
break;
case ARRAY:
{
Schema arrayType = type.getElementType();
boolean arrayElementNullable = arrayType.getType() == Schema.Type.UNION;
if (!arrayElementNullable) {
throw new IllegalArgumentException(
"Unsupported type for field "
+ fieldName
+ ". Cloud Spanner only supports nullable array values");
}
arrayType = AvroUtil.unpackNullable(arrayType);
if (arrayType == null) {
throw new IllegalArgumentException("Unsupported type for field " + fieldName);
}
switch (arrayType.getType()) {
case BOOLEAN:
builder.set(field, nullValue ? null : row.getBooleanList(fieldIndex));
break;
case LONG:
if ((dialect == Dialect.GOOGLE_STANDARD_SQL
&& spannerType.equals("ARRAY<TIMESTAMP>"))
|| (dialect == Dialect.POSTGRESQL
&& spannerType.equals("timestamp with time zone[]"))) {
List<Long> values =
row.getTimestampList(fieldIndex).stream()
.map(
timestamp ->
timestamp == null
? null
: (TimeUnit.SECONDS.toMicros(timestamp.getSeconds())
+ TimeUnit.NANOSECONDS.toMicros(timestamp.getNanos())))
.collect(Collectors.toList());
builder.set(field, nullValue ? null : values);
} else {
builder.set(field, nullValue ? null : row.getLongList(fieldIndex));
}
break;
case FLOAT:
{
builder.set(field, nullValue ? null : row.getFloatList(fieldIndex));
break;
}
case DOUBLE:
{
builder.set(field, nullValue ? null : row.getDoubleList(fieldIndex));
break;
}
case BYTES:
{
if (dialect == Dialect.GOOGLE_STANDARD_SQL
&& spannerType.equals("ARRAY<NUMERIC>")) {
if (nullValue) {
builder.set(field, null);
break;
}
List<ByteBuffer> numericValues = null;
numericValues =
row.getStringList(fieldIndex).stream()
.map(
numeric ->
numeric == null
? null
: ByteBuffer.wrap(NumericUtils.stringToBytes(numeric)))
.collect(Collectors.toList());
builder.set(field, numericValues);
break;
}
if (dialect == Dialect.POSTGRESQL && spannerType.equals("numeric[]")) {
if (nullValue) {
builder.set(field, null);
break;
}
List<ByteBuffer> numericValues = null;
numericValues =
row.getStringList(fieldIndex).stream()
.map(
numeric ->
numeric == null
? null
: ByteBuffer.wrap(NumericUtils.pgStringToBytes(numeric)))
.collect(Collectors.toList());
builder.set(field, numericValues);
break;
}
List<ByteBuffer> value = null;
if (!nullValue) {
value =
row.getBytesList(fieldIndex).stream()
.map(
bytes ->
bytes == null ? null : ByteBuffer.wrap(bytes.toByteArray()))
.collect(Collectors.toList());
}
builder.set(field, value);
break;
}
case STRING:
{
if (dialect == Dialect.GOOGLE_STANDARD_SQL) {
if (fieldInfo.matchesArrayPattern()
|| spannerType.equals("ARRAY<JSON>")
|| spannerType.equals("ARRAY<UUID>")) {
builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
} else if (spannerType.equals("ARRAY<TIMESTAMP>")) {
setTimestampArray(row, builder, field, fieldIndex, nullValue);
} else if (spannerType.equals("ARRAY<DATE>")) {
setDateArray(row, builder, field, fieldIndex, nullValue);
}
}
if (dialect == Dialect.POSTGRESQL) {
if (spannerType.equals("jsonb[]")) {
builder.set(field, nullValue ? null : row.getPgJsonbList(fieldIndex));
} else if (fieldInfo.matchesVarcharArrayPattern()
|| spannerType.equals("text[]")
|| spannerType.equals("uuid[]")) {
builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
} else if (spannerType.equals("timestamp with time zone[]")) {
setTimestampArray(row, builder, field, fieldIndex, nullValue);
} else if (spannerType.equals("date[]")) {
setDateArray(row, builder, field, fieldIndex, nullValue);
}
}
break;
}
default:
{
throw new IllegalArgumentException("Unsupported array type " + arrayType);
}
}
break;
}
default:
{
throw new IllegalArgumentException("Unsupported type" + type);
}
}
}
return builder.build();
}