in v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java [542:626]
static String handleLogicalFieldType(
String fieldName,
Object recordValue,
Schema fieldSchema,
CassandraAnnotations cassandraAnnotations) {
LOG.debug("found logical type for col {} with schema {}", fieldName, fieldSchema);
if (recordValue == null) {
return null;
}
if (fieldSchema.getLogicalType() instanceof LogicalTypes.Date) {
TimeConversions.DateConversion dateConversion = new TimeConversions.DateConversion();
LocalDate date =
dateConversion.fromInt(
Integer.valueOf(recordValue.toString()), fieldSchema, fieldSchema.getLogicalType());
return date.format(DateTimeFormatter.ISO_LOCAL_DATE);
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();
BigDecimal bigDecimal =
decimalConversion.fromBytes(
(ByteBuffer) recordValue, fieldSchema, fieldSchema.getLogicalType());
return bigDecimal.toPlainString();
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.TimeMicros) {
Long nanoseconds = Long.valueOf(recordValue.toString()) * TimeUnit.MICROSECONDS.toNanos(1);
return LocalTime.ofNanoOfDay(nanoseconds).format(DateTimeFormatter.ISO_LOCAL_TIME);
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.TimeMillis) {
Long nanoseconds = TimeUnit.MILLISECONDS.toNanos(Long.valueOf(recordValue.toString()));
return LocalTime.ofNanoOfDay(nanoseconds).format(DateTimeFormatter.ISO_LOCAL_TIME);
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.TimestampMicros) {
// We cannot convert to nanoseconds directly here since that can overflow for Long.
Long micros = Long.valueOf(recordValue.toString());
Instant timestamp =
Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(micros),
(micros % TimeUnit.SECONDS.toMicros(1)) * TimeUnit.MICROSECONDS.toNanos(1));
return timestamp.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
Instant timestamp = Instant.ofEpochMilli(Long.valueOf(recordValue.toString()));
return timestamp.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
} else if (fieldSchema.getProp(LOGICAL_TYPE) != null
&& fieldSchema.getProp(LOGICAL_TYPE).equals(CustomAvroTypes.JSON)) {
if (cassandraAnnotations.cassandraType().getKind().equals(Kind.MAP)) {
return AvroJsonToCassandraMapConvertor.handleJsonToMap(
recordValue, cassandraAnnotations, fieldName, fieldSchema);
} else {
return recordValue.toString();
}
} else if (fieldSchema.getProp(LOGICAL_TYPE) != null
&& fieldSchema.getProp(LOGICAL_TYPE).equals(CustomAvroTypes.NUMBER)) {
return recordValue.toString();
} else if (fieldSchema.getProp(LOGICAL_TYPE) != null
&& fieldSchema.getProp(LOGICAL_TYPE).equals(CustomAvroTypes.VARCHAR)) {
return recordValue.toString();
} else if (fieldSchema.getProp(LOGICAL_TYPE) != null
&& fieldSchema.getProp(LOGICAL_TYPE).equals(CustomAvroTypes.TIME_INTERVAL)) {
Long timeMicrosTotal = Long.valueOf(recordValue.toString());
boolean isNegative = false;
if (timeMicrosTotal < 0) {
timeMicrosTotal *= -1;
isNegative = true;
}
Long nanoseconds = timeMicrosTotal * TimeUnit.MICROSECONDS.toNanos(1);
Long hours = TimeUnit.NANOSECONDS.toHours(nanoseconds);
nanoseconds -= TimeUnit.HOURS.toNanos(hours);
Long minutes = TimeUnit.NANOSECONDS.toMinutes(nanoseconds);
nanoseconds -= TimeUnit.MINUTES.toNanos(minutes);
Long seconds = TimeUnit.NANOSECONDS.toSeconds(nanoseconds);
nanoseconds -= TimeUnit.SECONDS.toNanos(seconds);
Long micros = TimeUnit.NANOSECONDS.toMicros(nanoseconds);
// Pad 0 if single digit hour.
String timeString = (hours < 10) ? String.format("%02d", hours) : String.format("%d", hours);
timeString += String.format(":%02d:%02d", minutes, seconds);
if (micros > 0) {
timeString += String.format(".%d", micros);
}
return isNegative ? "-" + timeString : timeString;
} else if (fieldSchema.getProp(LOGICAL_TYPE) != null
&& fieldSchema.getProp(LOGICAL_TYPE).equals(CustomAvroTypes.UNSUPPORTED)) {
return null;
} else {
LOG.error("Unknown field type {} for field {} in {}.", fieldSchema, fieldName, recordValue);
throw new UnsupportedOperationException(
String.format(
"Unknown field type %s for field %s in %s.", fieldSchema, fieldName, recordValue));
}
}