public static Struct beamRowToStruct()

in v1/src/main/java/com/google/cloud/teleport/spanner/spannerio/StructUtils.java [126:222]


  public static Struct beamRowToStruct(Row row) {
    Struct.Builder structBuilder = Struct.newBuilder();
    List<Schema.Field> fields = row.getSchema().getFields();
    fields.forEach(
        field -> {
          String column = field.getName();
          switch (field.getType().getTypeName()) {
            case ROW:
              @Nullable Row subRow = row.getRow(column);
              if (subRow == null) {
                structBuilder.set(column).to(beamTypeToSpannerType(field.getType()), null);
              } else {
                structBuilder
                    .set(column)
                    .to(beamTypeToSpannerType(field.getType()), beamRowToStruct(subRow));
              }
              break;
            case ARRAY:
              addIterableToStructBuilder(structBuilder, row.getArray(column), field);
              break;
            case ITERABLE:
              addIterableToStructBuilder(structBuilder, row.getIterable(column), field);
              break;
            case FLOAT:
              structBuilder.set(column).to(row.getFloat(column));
              break;
            case DOUBLE:
              structBuilder.set(column).to(row.getDouble(column));
              break;
            case INT16:
              @Nullable Short int16 = row.getInt16(column);
              if (int16 == null) {
                structBuilder.set(column).to((Long) null);
              } else {
                structBuilder.set(column).to(int16);
              }
              break;
            case INT32:
              @Nullable Integer int32 = row.getInt32(column);
              if (int32 == null) {
                structBuilder.set(column).to((Long) null);
              } else {
                structBuilder.set(column).to(int32);
              }
              break;
            case INT64:
              structBuilder.set(column).to(row.getInt64(column));
              break;
            case DECIMAL:
              @Nullable BigDecimal decimal = row.getDecimal(column);
              // BigDecimal is not nullable
              if (decimal == null) {
                checkNotNull(decimal, "Null decimal at column " + column);
              } else {
                structBuilder.set(column).to(decimal);
              }
              break;
              // TODO: implement logical type date and timestamp
            case DATETIME:
              @Nullable ReadableDateTime dateTime = row.getDateTime(column);
              if (dateTime == null) {
                structBuilder.set(column).to((Timestamp) null);
              } else {
                structBuilder.set(column).to(Timestamp.parseTimestamp(dateTime.toString()));
              }
              break;
            case STRING:
              structBuilder.set(column).to(row.getString(column));
              break;
            case BYTE:
              @Nullable Byte byteValue = row.getByte(column);
              if (byteValue == null) {
                structBuilder.set(column).to((Long) null);
              } else {
                structBuilder.set(column).to(byteValue);
              }
              break;
            case BYTES:
              byte @Nullable [] bytes = row.getBytes(column);
              if (bytes == null) {
                structBuilder.set(column).to((ByteArray) null);
              } else {
                structBuilder.set(column).to(ByteArray.copyFrom(bytes));
              }
              break;
            case BOOLEAN:
              structBuilder.set(column).to(row.getBoolean(column));
              break;
            default:
              throw new IllegalArgumentException(
                  String.format(
                      "Unsupported beam type '%s' while translating row to struct.",
                      field.getType().getTypeName()));
          }
        });
    return structBuilder.build();
  }