in v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithShardForLiveIT.java [115:234]
public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request)
throws InvalidTransformationException {
if (request.getTableName().equals("AllDatatypeTransformation")) {
Map<String, Object> responseRow = new HashMap<>();
Map<String, Object> requestRow = request.getRequestRow();
// Filter event in case "varchar_column" = "example1"
if (requestRow.get("varchar_column").equals("example1")) {
return new MigrationTransformationResponse(null, true);
}
// In case of update/delete events, return request as response without any transformation
if (request.getEventType().equals("UPDATE")) {
return new MigrationTransformationResponse(null, false);
}
if (request.getEventType().equals("DELETE")) {
return new MigrationTransformationResponse(null, true);
}
// In case of INSERT update the values for all the columns in all the rows except the
// filtered row.
Long tinyIntColumn = Long.parseLong((String) requestRow.get("tinyint_column")) + 1;
Long intColumn = Long.parseLong((String) requestRow.get("int_column")) + 1;
Long bigIntColumn = Long.parseLong((String) requestRow.get("bigint_column")) + 1;
Long yearColumn = Long.parseLong((String) requestRow.get("year_column")) + 1;
BigDecimal floatColumn = (BigDecimal) requestRow.get("float_column");
BigDecimal doubleColumn = (BigDecimal) requestRow.get("double_column");
responseRow.put("tinyint_column", tinyIntColumn.toString());
responseRow.put("text_column", "\'" + requestRow.get("text_column") + " append\'");
responseRow.put("int_column", intColumn.toString());
responseRow.put("bigint_column", bigIntColumn.toString());
responseRow.put("float_column", floatColumn.add(BigDecimal.ONE).toString());
responseRow.put("double_column", doubleColumn.add(BigDecimal.ONE).toString());
Double value = Double.parseDouble((String) requestRow.get("decimal_column"));
responseRow.put("decimal_column", String.valueOf(value - 1));
responseRow.put("bool_column", "false");
responseRow.put("enum_column", "\'3\'");
responseRow.put(
"blob_column",
"from_base64(\'"
+ Base64.getEncoder()
.encodeToString("blob_column_appended".getBytes(StandardCharsets.UTF_8))
+ "\')");
responseRow.put(
"binary_column",
"binary(from_base64(\'"
+ Base64.getEncoder()
.encodeToString("binary_column_appended".getBytes(StandardCharsets.UTF_8))
+ "\'))");
responseRow.put(
"bit_column",
"binary(from_base64(\'"
+ Base64.getEncoder().encodeToString("5".getBytes(StandardCharsets.UTF_8))
+ "\'))");
responseRow.put("year_column", "\'" + yearColumn + "\'");
try {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX");
dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // Ensure it handles UTC correctly
Date date = dateFormat.parse((String) requestRow.get("date_column"));
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.DAY_OF_MONTH, 1);
responseRow.put("date_column", "\'" + dateFormat.format(calendar.getTime()) + "\'");
Date dateTime = dateTimeFormat.parse((String) requestRow.get("datetime_column"));
calendar.setTime(dateTime);
calendar.add(Calendar.SECOND, -1);
String dateTimeColumn = dateTimeFormat.format(calendar.getTime());
responseRow.put(
"datetime_column",
"CONVERT_TZ(\'"
+ dateTimeColumn.substring(0, dateTimeColumn.length() - 1)
+ "\','+00:00','+00:00')");
dateTime = dateTimeFormat.parse((String) requestRow.get("timestamp_column"));
calendar.setTime(dateTime);
calendar.add(Calendar.SECOND, -1);
String timestampColumn = dateTimeFormat.format(calendar.getTime());
responseRow.put(
"timestamp_column",
"CONVERT_TZ(\'"
+ timestampColumn.substring(0, timestampColumn.length() - 1)
+ "\','+00:00','+00:00')");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
LocalTime time = LocalTime.parse((String) requestRow.get("time_column"), formatter);
LocalTime newTime = time.plusMinutes(10);
responseRow.put("time_column", "\'" + newTime.format(formatter) + "\'");
} catch (Exception e) {
throw new InvalidTransformationException(e);
}
MigrationTransformationResponse response =
new MigrationTransformationResponse(responseRow, false);
return response;
} else if (request.getTableName().equals("Person")) {
Map<String, Object> responseRow = new HashMap<>();
Map<String, Object> requestRow = request.getRequestRow();
String firstName1 = requestRow.get("first_name1").toString();
String lastName1 = requestRow.get("last_name1").toString();
String firstName2 = requestRow.get("first_name2").toString();
String lastName2 = requestRow.get("last_name2").toString();
String firstName3 = requestRow.get("first_name3").toString();
String lastName3 = requestRow.get("last_name3").toString();
responseRow.put("full_name1", "\'" + firstName1 + " " + lastName1 + "\'");
responseRow.put("full_name2", "\'" + firstName2 + " " + lastName2 + "\'");
responseRow.put("full_name3", "\'" + firstName3 + " " + lastName3 + "\'");
MigrationTransformationResponse response =
new MigrationTransformationResponse(responseRow, false);
return response;
} else if (request.getTableName().equals("Users1")) {
Map<String, Object> responseRow = new HashMap<>();
Map<String, Object> requestRow = request.getRequestRow();
String name = requestRow.get("name").toString();
String[] nameArray = name.split(" ");
responseRow.put("first_name", "\'" + nameArray[0] + "\'");
responseRow.put("last_name", "\'" + nameArray[1] + "\'");
MigrationTransformationResponse response =
new MigrationTransformationResponse(responseRow, false);
return response;
}
return new MigrationTransformationResponse(null, false);
}