public GenericRecord convert()

in v1/src/main/java/com/google/cloud/teleport/spanner/SpannerRecordConverter.java [166:414]


  public GenericRecord convert(Struct row) {
    synchronized (this.fields) {
      if (!fieldsColumnIndicesInitialized) {
        this.fields.stream()
            .forEach(
                fieldInfo -> {
                  if (!fieldInfo.generated) {
                    fieldInfo.setColumnIndex(row);
                  }
                });
        fieldsColumnIndicesInitialized = true;
      }
    }

    GenericRecordBuilder builder = new GenericRecordBuilder(schema);
    for (FieldInfo fieldInfo : this.fields) {
      if (fieldInfo.isGenerated()) {
        // Generated column values are not exported.
        continue;
      }

      fieldInfo.checkSupported();

      Schema.Field field = fieldInfo.getField();
      String fieldName = fieldInfo.getName();
      Schema type = fieldInfo.getType();
      String spannerType = fieldInfo.getSpannerType();

      int fieldIndex = fieldInfo.getColumnIndex();

      boolean nullValue = row.isNull(fieldIndex);
      if (nullValue && !fieldInfo.isNullable()) {
        throw new IllegalArgumentException("Unexpected null value for field " + fieldName);
      }
      switch (type.getType()) {
        case BOOLEAN:
          builder.set(field, nullValue ? null : row.getBoolean(fieldIndex));
          break;
        case LONG:
          if ((dialect == Dialect.GOOGLE_STANDARD_SQL && spannerType.equals("TIMESTAMP"))
              || (dialect == Dialect.POSTGRESQL
                  && (spannerType.equals("timestamp with time zone")
                      || spannerType.equals("spanner.commit_timestamp")))) {
            long microSeconds = 0L;
            if (!nullValue) {
              Timestamp ts = row.getTimestamp(fieldIndex);
              microSeconds =
                  TimeUnit.SECONDS.toMicros(ts.getSeconds())
                      + TimeUnit.NANOSECONDS.toMicros(ts.getNanos());
            }
            builder.set(field, nullValue ? null : microSeconds);
          } else {
            builder.set(field, nullValue ? null : row.getLong(fieldIndex));
          }
          break;
        case FLOAT:
          builder.set(field, nullValue ? null : row.getFloat(fieldIndex));
          break;
        case DOUBLE:
          builder.set(field, nullValue ? null : row.getDouble(fieldIndex));
          break;
        case BYTES:
          if (dialect == Dialect.GOOGLE_STANDARD_SQL && spannerType.equals("NUMERIC")) {
            // TODO: uses row.getNumeric() once teleport uses new spanner library.
            builder.set(
                field,
                nullValue
                    ? null
                    : ByteBuffer.wrap(
                        NumericUtils.stringToBytes(row.getBigDecimal(fieldIndex).toString())));
            break;
          }
          if (dialect == Dialect.POSTGRESQL && spannerType.equals("numeric")) {
            builder.set(
                field,
                nullValue
                    ? null
                    : ByteBuffer.wrap(NumericUtils.pgStringToBytes(row.getString(fieldIndex))));
            break;
          }
          builder.set(
              field, nullValue ? null : ByteBuffer.wrap(row.getBytes(fieldIndex).toByteArray()));
          break;
        case STRING:
          if (dialect == Dialect.GOOGLE_STANDARD_SQL) {
            if (fieldInfo.matchesStringPattern() || spannerType.equals("JSON")) {
              builder.set(field, nullValue ? null : row.getString(fieldIndex));
            } else if (spannerType.equals("TIMESTAMP")) {
              builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
            } else if (spannerType.equals("DATE")) {
              builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
            } else if (spannerType.equals("UUID")) {
              builder.set(field, nullValue ? null : row.getString(fieldIndex));
            }
          } else if (dialect == Dialect.POSTGRESQL) {
            if (spannerType.equals("jsonb")) {
              builder.set(field, nullValue ? null : row.getPgJsonb(fieldIndex));
            } else if (fieldInfo.matchesVarcharPattern() || spannerType.equals("text")) {
              builder.set(field, nullValue ? null : row.getString(fieldIndex));
            } else if (spannerType.equals("timestamp with time zone")) {
              builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
            } else if (spannerType.equals("spanner.commit_timestamp")) {
              builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
            } else if (spannerType.equals("date")) {
              builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
            } else if (spannerType.equals("uuid")) {
              builder.set(field, nullValue ? null : row.getString(fieldIndex));
            }
          }
          break;
        case ARRAY:
          {
            Schema arrayType = type.getElementType();
            boolean arrayElementNullable = arrayType.getType() == Schema.Type.UNION;
            if (!arrayElementNullable) {
              throw new IllegalArgumentException(
                  "Unsupported type for field "
                      + fieldName
                      + ". Cloud Spanner only supports nullable array values");
            }
            arrayType = AvroUtil.unpackNullable(arrayType);
            if (arrayType == null) {
              throw new IllegalArgumentException("Unsupported type for field " + fieldName);
            }
            switch (arrayType.getType()) {
              case BOOLEAN:
                builder.set(field, nullValue ? null : row.getBooleanList(fieldIndex));
                break;
              case LONG:
                if ((dialect == Dialect.GOOGLE_STANDARD_SQL
                        && spannerType.equals("ARRAY<TIMESTAMP>"))
                    || (dialect == Dialect.POSTGRESQL
                        && spannerType.equals("timestamp with time zone[]"))) {
                  List<Long> values =
                      row.getTimestampList(fieldIndex).stream()
                          .map(
                              timestamp ->
                                  timestamp == null
                                      ? null
                                      : (TimeUnit.SECONDS.toMicros(timestamp.getSeconds())
                                          + TimeUnit.NANOSECONDS.toMicros(timestamp.getNanos())))
                          .collect(Collectors.toList());
                  builder.set(field, nullValue ? null : values);
                } else {
                  builder.set(field, nullValue ? null : row.getLongList(fieldIndex));
                }
                break;
              case FLOAT:
                {
                  builder.set(field, nullValue ? null : row.getFloatList(fieldIndex));
                  break;
                }
              case DOUBLE:
                {
                  builder.set(field, nullValue ? null : row.getDoubleList(fieldIndex));
                  break;
                }
              case BYTES:
                {
                  if (dialect == Dialect.GOOGLE_STANDARD_SQL
                      && spannerType.equals("ARRAY<NUMERIC>")) {
                    if (nullValue) {
                      builder.set(field, null);
                      break;
                    }
                    List<ByteBuffer> numericValues = null;
                    numericValues =
                        row.getStringList(fieldIndex).stream()
                            .map(
                                numeric ->
                                    numeric == null
                                        ? null
                                        : ByteBuffer.wrap(NumericUtils.stringToBytes(numeric)))
                            .collect(Collectors.toList());
                    builder.set(field, numericValues);
                    break;
                  }
                  if (dialect == Dialect.POSTGRESQL && spannerType.equals("numeric[]")) {
                    if (nullValue) {
                      builder.set(field, null);
                      break;
                    }
                    List<ByteBuffer> numericValues = null;
                    numericValues =
                        row.getStringList(fieldIndex).stream()
                            .map(
                                numeric ->
                                    numeric == null
                                        ? null
                                        : ByteBuffer.wrap(NumericUtils.pgStringToBytes(numeric)))
                            .collect(Collectors.toList());
                    builder.set(field, numericValues);
                    break;
                  }
                  List<ByteBuffer> value = null;
                  if (!nullValue) {
                    value =
                        row.getBytesList(fieldIndex).stream()
                            .map(
                                bytes ->
                                    bytes == null ? null : ByteBuffer.wrap(bytes.toByteArray()))
                            .collect(Collectors.toList());
                  }
                  builder.set(field, value);
                  break;
                }
              case STRING:
                {
                  if (dialect == Dialect.GOOGLE_STANDARD_SQL) {
                    if (fieldInfo.matchesArrayPattern()
                        || spannerType.equals("ARRAY<JSON>")
                        || spannerType.equals("ARRAY<UUID>")) {
                      builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
                    } else if (spannerType.equals("ARRAY<TIMESTAMP>")) {
                      setTimestampArray(row, builder, field, fieldIndex, nullValue);
                    } else if (spannerType.equals("ARRAY<DATE>")) {
                      setDateArray(row, builder, field, fieldIndex, nullValue);
                    }
                  }
                  if (dialect == Dialect.POSTGRESQL) {
                    if (spannerType.equals("jsonb[]")) {
                      builder.set(field, nullValue ? null : row.getPgJsonbList(fieldIndex));
                    } else if (fieldInfo.matchesVarcharArrayPattern()
                        || spannerType.equals("text[]")
                        || spannerType.equals("uuid[]")) {
                      builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
                    } else if (spannerType.equals("timestamp with time zone[]")) {
                      setTimestampArray(row, builder, field, fieldIndex, nullValue);
                    } else if (spannerType.equals("date[]")) {
                      setDateArray(row, builder, field, fieldIndex, nullValue);
                    }
                  }
                  break;
                }
              default:
                {
                  throw new IllegalArgumentException("Unsupported array type " + arrayType);
                }
            }
            break;
          }
        default:
          {
            throw new IllegalArgumentException("Unsupported type" + type);
          }
      }
    }
    return builder.build();
  }