in v1/src/main/java/com/google/cloud/teleport/spanner/spannerio/StructUtils.java [126:222]
public static Struct beamRowToStruct(Row row) {
Struct.Builder structBuilder = Struct.newBuilder();
List<Schema.Field> fields = row.getSchema().getFields();
fields.forEach(
field -> {
String column = field.getName();
switch (field.getType().getTypeName()) {
case ROW:
@Nullable Row subRow = row.getRow(column);
if (subRow == null) {
structBuilder.set(column).to(beamTypeToSpannerType(field.getType()), null);
} else {
structBuilder
.set(column)
.to(beamTypeToSpannerType(field.getType()), beamRowToStruct(subRow));
}
break;
case ARRAY:
addIterableToStructBuilder(structBuilder, row.getArray(column), field);
break;
case ITERABLE:
addIterableToStructBuilder(structBuilder, row.getIterable(column), field);
break;
case FLOAT:
structBuilder.set(column).to(row.getFloat(column));
break;
case DOUBLE:
structBuilder.set(column).to(row.getDouble(column));
break;
case INT16:
@Nullable Short int16 = row.getInt16(column);
if (int16 == null) {
structBuilder.set(column).to((Long) null);
} else {
structBuilder.set(column).to(int16);
}
break;
case INT32:
@Nullable Integer int32 = row.getInt32(column);
if (int32 == null) {
structBuilder.set(column).to((Long) null);
} else {
structBuilder.set(column).to(int32);
}
break;
case INT64:
structBuilder.set(column).to(row.getInt64(column));
break;
case DECIMAL:
@Nullable BigDecimal decimal = row.getDecimal(column);
// BigDecimal is not nullable
if (decimal == null) {
checkNotNull(decimal, "Null decimal at column " + column);
} else {
structBuilder.set(column).to(decimal);
}
break;
// TODO: implement logical type date and timestamp
case DATETIME:
@Nullable ReadableDateTime dateTime = row.getDateTime(column);
if (dateTime == null) {
structBuilder.set(column).to((Timestamp) null);
} else {
structBuilder.set(column).to(Timestamp.parseTimestamp(dateTime.toString()));
}
break;
case STRING:
structBuilder.set(column).to(row.getString(column));
break;
case BYTE:
@Nullable Byte byteValue = row.getByte(column);
if (byteValue == null) {
structBuilder.set(column).to((Long) null);
} else {
structBuilder.set(column).to(byteValue);
}
break;
case BYTES:
byte @Nullable [] bytes = row.getBytes(column);
if (bytes == null) {
structBuilder.set(column).to((ByteArray) null);
} else {
structBuilder.set(column).to(ByteArray.copyFrom(bytes));
}
break;
case BOOLEAN:
structBuilder.set(column).to(row.getBoolean(column));
break;
default:
throw new IllegalArgumentException(
String.format(
"Unsupported beam type '%s' while translating row to struct.",
field.getType().getTypeName()));
}
});
return structBuilder.build();
}