in v1/src/main/java/com/google/cloud/teleport/spanner/CSVRecordToMutation.java [122:290]
protected final Mutation parseRow(
Mutation.WriteBuilder builder,
CSVRecord row,
Table table,
List<TableManifest.Column> manifestColumns)
throws IllegalArgumentException {
// The input row's column count could be less than or equal to that of DB schema's.
if (row.size() > table.columns().size()) {
throw new RuntimeException(
String.format(
"Parsed row's column count is larger than that of the schema's. "
+ "Row size: %d, Column size: %d, Row content: %s",
row.size(), table.columns().size(), row.toString()));
}
if (manifestColumns.size() > 0 && row.size() > manifestColumns.size()) {
throw new RuntimeException(
String.format(
"Parsed row's column count is larger than that of the manifest's column list. "
+ "Row size: %d, Manifest column size: %d, Row content: %s",
row.size(), manifestColumns.size(), row.toString()));
}
// Extract cell by cell and construct Mutation object
for (int i = 0; i < row.size(); i++) {
// If column info is provided in manifest, we use the name from manifest.
// Otherwise, we use the column name read from DB.
String columnName =
manifestColumns != null && manifestColumns.size() > 0
? manifestColumns.get(i).getColumnName()
: table.columns().get(i).name();
com.google.cloud.teleport.spanner.common.Type columnType = table.column(columnName).type();
String cellValue = row.get(i);
boolean isNullValue = Strings.isNullOrEmpty(cellValue);
Value columnValue = null;
// TODO: make the tests below match Spanner's SQL literal rules wherever possible,
// in terms of how input is accepted, and throw exceptions on invalid input.
switch (columnType.getCode()) {
case BOOL:
case PG_BOOL:
if (isNullValue) {
columnValue = Value.bool(null);
} else {
Boolean bCellValue;
if (cellValue.trim().equalsIgnoreCase("true")) {
bCellValue = Boolean.TRUE;
} else if (cellValue.trim().equalsIgnoreCase("false")) {
bCellValue = Boolean.FALSE;
} else {
throw new IllegalArgumentException(
cellValue.trim() + " is not recognizable value " + "for BOOL type");
}
columnValue = Value.bool(Boolean.valueOf(cellValue));
}
break;
case INT64:
case PG_INT8:
columnValue =
isNullValue ? Value.int64(null) : Value.int64(Long.valueOf(cellValue.trim()));
break;
case FLOAT32:
case PG_FLOAT4:
columnValue =
isNullValue ? Value.float32(null) : Value.float32(Float.valueOf(cellValue.trim()));
break;
case FLOAT64:
case PG_FLOAT8:
columnValue =
isNullValue ? Value.float64(null) : Value.float64(Double.valueOf(cellValue.trim()));
break;
case STRING:
case PG_VARCHAR:
case PG_TEXT:
columnValue = Value.string(cellValue);
break;
case UUID:
case PG_UUID:
columnValue = Value.string(isNullValue ? null : cellValue);
break;
case DATE:
case PG_DATE:
if (isNullValue) {
columnValue = Value.date(null);
} else {
LocalDate dt =
LocalDate.parse(
cellValue.trim(),
DateTimeFormatter.ofPattern(
dateFormat.get() == null
? "yyyy-M[M]-d[d][' 00:00:00']"
: dateFormat.get()));
columnValue =
Value.date(
com.google.cloud.Date.fromYearMonthDay(
dt.getYear(), dt.getMonthValue(), dt.getDayOfMonth()));
}
break;
case TIMESTAMP:
case PG_TIMESTAMPTZ:
case PG_SPANNER_COMMIT_TIMESTAMP:
if (isNullValue) {
columnValue = Value.timestamp(null);
} else {
// Timestamp is either a long integer representing Unix epoch time or a string, which
// will be parsed using the pattern corresponding to the timestampFormat flag.
Long microseconds = Longs.tryParse(cellValue);
if (microseconds != null) {
columnValue =
Value.timestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(microseconds));
} else {
DateTimeFormatter formatter =
timestampFormat.get() == null
? DateTimeFormatter.ISO_INSTANT
: DateTimeFormatter.ofPattern(timestampFormat.get());
TemporalAccessor temporalAccessor = formatter.parse(cellValue.trim());
Instant ts;
try {
ts = Instant.from(temporalAccessor);
} catch (DateTimeException e) {
// Date format may not be converted because it lacks timezone, retry with UTC
LocalDateTime localDateTime = LocalDateTime.from(temporalAccessor);
ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, ZoneOffset.UTC);
ts = Instant.from(zonedDateTime);
}
columnValue =
Value.timestamp(
com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
ts.getEpochSecond(), ts.getNano()));
}
}
break;
case NUMERIC:
case JSON:
case PG_JSONB:
columnValue = isNullValue ? Value.string(null) : Value.string(cellValue.trim());
break;
case PG_NUMERIC:
columnValue = isNullValue ? Value.pgNumeric(null) : Value.pgNumeric(cellValue.trim());
break;
case BYTES:
case PG_BYTEA:
columnValue =
isNullValue ? Value.bytes(null) : Value.bytes(ByteArray.fromBase64(cellValue.trim()));
break;
case PROTO:
columnValue =
isNullValue
? Value.protoMessage(null, columnType.getProtoTypeFqn())
: Value.protoMessage(
ByteArray.fromBase64(cellValue.trim()), columnType.getProtoTypeFqn());
break;
case ENUM:
columnValue =
isNullValue
? Value.protoEnum(null, columnType.getProtoTypeFqn())
: Value.protoEnum(Long.valueOf(cellValue.trim()), columnType.getProtoTypeFqn());
break;
default:
throw new IllegalArgumentException(
"Unrecognized column data type: " + columnType.getCode());
}
builder.set(columnName).to(columnValue);
}
return builder.build();
}