protected final Mutation parseRow()

in v1/src/main/java/com/google/cloud/teleport/spanner/CSVRecordToMutation.java [122:290]


  protected final Mutation parseRow(
      Mutation.WriteBuilder builder,
      CSVRecord row,
      Table table,
      List<TableManifest.Column> manifestColumns)
      throws IllegalArgumentException {
    // The input row's column count could be less than or equal to that of DB schema's.
    if (row.size() > table.columns().size()) {
      throw new RuntimeException(
          String.format(
              "Parsed row's column count is larger than that of the schema's. "
                  + "Row size: %d, Column size: %d, Row content: %s",
              row.size(), table.columns().size(), row.toString()));
    }

    if (manifestColumns.size() > 0 && row.size() > manifestColumns.size()) {
      throw new RuntimeException(
          String.format(
              "Parsed row's column count is larger than that of the manifest's column list. "
                  + "Row size: %d, Manifest column size: %d, Row content: %s",
              row.size(), manifestColumns.size(), row.toString()));
    }

    // Extract cell by cell and construct Mutation object
    for (int i = 0; i < row.size(); i++) {
      // If column info is provided in manifest, we use the name from manifest.
      // Otherwise, we use the column name read from DB.
      String columnName =
          manifestColumns != null && manifestColumns.size() > 0
              ? manifestColumns.get(i).getColumnName()
              : table.columns().get(i).name();
      com.google.cloud.teleport.spanner.common.Type columnType = table.column(columnName).type();
      String cellValue = row.get(i);
      boolean isNullValue = Strings.isNullOrEmpty(cellValue);
      Value columnValue = null;
      // TODO: make the tests below match Spanner's SQL literal rules wherever possible,
      // in terms of how input is accepted, and throw exceptions on invalid input.
      switch (columnType.getCode()) {
        case BOOL:
        case PG_BOOL:
          if (isNullValue) {
            columnValue = Value.bool(null);
          } else {
            Boolean bCellValue;
            if (cellValue.trim().equalsIgnoreCase("true")) {
              bCellValue = Boolean.TRUE;
            } else if (cellValue.trim().equalsIgnoreCase("false")) {
              bCellValue = Boolean.FALSE;
            } else {
              throw new IllegalArgumentException(
                  cellValue.trim() + " is not recognizable value " + "for BOOL type");
            }
            columnValue = Value.bool(Boolean.valueOf(cellValue));
          }
          break;
        case INT64:
        case PG_INT8:
          columnValue =
              isNullValue ? Value.int64(null) : Value.int64(Long.valueOf(cellValue.trim()));
          break;
        case FLOAT32:
        case PG_FLOAT4:
          columnValue =
              isNullValue ? Value.float32(null) : Value.float32(Float.valueOf(cellValue.trim()));
          break;
        case FLOAT64:
        case PG_FLOAT8:
          columnValue =
              isNullValue ? Value.float64(null) : Value.float64(Double.valueOf(cellValue.trim()));
          break;
        case STRING:
        case PG_VARCHAR:
        case PG_TEXT:
          columnValue = Value.string(cellValue);
          break;
        case UUID:
        case PG_UUID:
          columnValue = Value.string(isNullValue ? null : cellValue);
          break;
        case DATE:
        case PG_DATE:
          if (isNullValue) {
            columnValue = Value.date(null);
          } else {
            LocalDate dt =
                LocalDate.parse(
                    cellValue.trim(),
                    DateTimeFormatter.ofPattern(
                        dateFormat.get() == null
                            ? "yyyy-M[M]-d[d][' 00:00:00']"
                            : dateFormat.get()));
            columnValue =
                Value.date(
                    com.google.cloud.Date.fromYearMonthDay(
                        dt.getYear(), dt.getMonthValue(), dt.getDayOfMonth()));
          }
          break;
        case TIMESTAMP:
        case PG_TIMESTAMPTZ:
        case PG_SPANNER_COMMIT_TIMESTAMP:
          if (isNullValue) {
            columnValue = Value.timestamp(null);
          } else {
            // Timestamp is either a long integer representing Unix epoch time or a string, which
            // will be parsed using the pattern corresponding to the timestampFormat flag.
            Long microseconds = Longs.tryParse(cellValue);
            if (microseconds != null) {
              columnValue =
                  Value.timestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(microseconds));
            } else {
              DateTimeFormatter formatter =
                  timestampFormat.get() == null
                      ? DateTimeFormatter.ISO_INSTANT
                      : DateTimeFormatter.ofPattern(timestampFormat.get());
              TemporalAccessor temporalAccessor = formatter.parse(cellValue.trim());

              Instant ts;
              try {
                ts = Instant.from(temporalAccessor);
              } catch (DateTimeException e) {
                // Date format may not be converted because it lacks timezone, retry with UTC
                LocalDateTime localDateTime = LocalDateTime.from(temporalAccessor);
                ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, ZoneOffset.UTC);
                ts = Instant.from(zonedDateTime);
              }

              columnValue =
                  Value.timestamp(
                      com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
                          ts.getEpochSecond(), ts.getNano()));
            }
          }
          break;
        case NUMERIC:
        case JSON:
        case PG_JSONB:
          columnValue = isNullValue ? Value.string(null) : Value.string(cellValue.trim());
          break;
        case PG_NUMERIC:
          columnValue = isNullValue ? Value.pgNumeric(null) : Value.pgNumeric(cellValue.trim());
          break;
        case BYTES:
        case PG_BYTEA:
          columnValue =
              isNullValue ? Value.bytes(null) : Value.bytes(ByteArray.fromBase64(cellValue.trim()));
          break;
        case PROTO:
          columnValue =
              isNullValue
                  ? Value.protoMessage(null, columnType.getProtoTypeFqn())
                  : Value.protoMessage(
                      ByteArray.fromBase64(cellValue.trim()), columnType.getProtoTypeFqn());
          break;
        case ENUM:
          columnValue =
              isNullValue
                  ? Value.protoEnum(null, columnType.getProtoTypeFqn())
                  : Value.protoEnum(Long.valueOf(cellValue.trim()), columnType.getProtoTypeFqn());
          break;
        default:
          throw new IllegalArgumentException(
              "Unrecognized column data type: " + columnType.getCode());
      }

      builder.set(columnName).to(columnValue);
    }

    return builder.build();
  }