in v1/src/main/java/com/google/cloud/teleport/spanner/spannerio/MutationUtils.java [181:280]
private static void setBeamValueToMutation(
Mutation.WriteBuilder mutationBuilder,
Schema.FieldType fieldType,
String columnName,
Row row) {
switch (fieldType.getTypeName()) {
case BYTE:
@Nullable Byte byteValue = row.getByte(columnName);
if (byteValue == null) {
mutationBuilder.set(columnName).to(((Long) null));
} else {
mutationBuilder.set(columnName).to(byteValue);
}
break;
case INT16:
@Nullable Short int16 = row.getInt16(columnName);
if (int16 == null) {
mutationBuilder.set(columnName).to(((Long) null));
} else {
mutationBuilder.set(columnName).to(int16);
}
break;
case INT32:
@Nullable Integer int32 = row.getInt32(columnName);
if (int32 == null) {
mutationBuilder.set(columnName).to(((Long) null));
} else {
mutationBuilder.set(columnName).to(int32);
}
break;
case INT64:
mutationBuilder.set(columnName).to(row.getInt64(columnName));
break;
case FLOAT:
mutationBuilder.set(columnName).to(row.getFloat(columnName));
break;
case DOUBLE:
mutationBuilder.set(columnName).to(row.getDouble(columnName));
break;
case DECIMAL:
@Nullable BigDecimal decimal = row.getDecimal(columnName);
// BigDecimal is not nullable
if (decimal == null) {
checkNotNull(decimal, "Null decimal at column " + columnName);
} else {
mutationBuilder.set(columnName).to(decimal);
}
break;
// TODO: Implement logical date and datetime
case DATETIME:
@Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
if (dateTime == null) {
mutationBuilder.set(columnName).to(((Timestamp) null));
} else {
mutationBuilder
.set(columnName)
.to(Timestamp.ofTimeMicroseconds(dateTime.toInstant().getMillis() * 1000L));
}
break;
case BOOLEAN:
mutationBuilder.set(columnName).to(row.getBoolean(columnName));
break;
case STRING:
mutationBuilder.set(columnName).to(row.getString(columnName));
break;
case BYTES:
byte @Nullable [] bytes = row.getBytes(columnName);
if (bytes == null) {
mutationBuilder.set(columnName).to(((ByteArray) null));
} else {
mutationBuilder.set(columnName).to(ByteArray.copyFrom(bytes));
}
break;
case ROW:
@Nullable Row subRow = row.getRow(columnName);
if (subRow == null) {
mutationBuilder
.set(columnName)
.to(beamTypeToSpannerType(row.getSchema().getField(columnName).getType()), null);
} else {
mutationBuilder
.set(columnName)
.to(
beamTypeToSpannerType(row.getSchema().getField(columnName).getType()),
beamRowToStruct(subRow));
}
break;
case ARRAY:
addIterableToMutationBuilder(
mutationBuilder, row.getArray(columnName), row.getSchema().getField(columnName));
break;
case ITERABLE:
addIterableToMutationBuilder(
mutationBuilder, row.getIterable(columnName), row.getSchema().getField(columnName));
break;
default:
throw new IllegalArgumentException(
String.format("Unsupported field type: %s", fieldType.getTypeName()));
}
}