private static void setBeamValueToMutation()

in v1/src/main/java/com/google/cloud/teleport/spanner/spannerio/MutationUtils.java [181:280]


  private static void setBeamValueToMutation(
      Mutation.WriteBuilder mutationBuilder,
      Schema.FieldType fieldType,
      String columnName,
      Row row) {
    switch (fieldType.getTypeName()) {
      case BYTE:
        @Nullable Byte byteValue = row.getByte(columnName);
        if (byteValue == null) {
          mutationBuilder.set(columnName).to(((Long) null));
        } else {
          mutationBuilder.set(columnName).to(byteValue);
        }
        break;
      case INT16:
        @Nullable Short int16 = row.getInt16(columnName);
        if (int16 == null) {
          mutationBuilder.set(columnName).to(((Long) null));
        } else {
          mutationBuilder.set(columnName).to(int16);
        }
        break;
      case INT32:
        @Nullable Integer int32 = row.getInt32(columnName);
        if (int32 == null) {
          mutationBuilder.set(columnName).to(((Long) null));
        } else {
          mutationBuilder.set(columnName).to(int32);
        }
        break;
      case INT64:
        mutationBuilder.set(columnName).to(row.getInt64(columnName));
        break;
      case FLOAT:
        mutationBuilder.set(columnName).to(row.getFloat(columnName));
        break;
      case DOUBLE:
        mutationBuilder.set(columnName).to(row.getDouble(columnName));
        break;
      case DECIMAL:
        @Nullable BigDecimal decimal = row.getDecimal(columnName);
        // BigDecimal is not nullable
        if (decimal == null) {
          checkNotNull(decimal, "Null decimal at column " + columnName);
        } else {
          mutationBuilder.set(columnName).to(decimal);
        }
        break;
        // TODO: Implement logical date and datetime
      case DATETIME:
        @Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
        if (dateTime == null) {
          mutationBuilder.set(columnName).to(((Timestamp) null));
        } else {
          mutationBuilder
              .set(columnName)
              .to(Timestamp.ofTimeMicroseconds(dateTime.toInstant().getMillis() * 1000L));
        }
        break;
      case BOOLEAN:
        mutationBuilder.set(columnName).to(row.getBoolean(columnName));
        break;
      case STRING:
        mutationBuilder.set(columnName).to(row.getString(columnName));
        break;
      case BYTES:
        byte @Nullable [] bytes = row.getBytes(columnName);
        if (bytes == null) {
          mutationBuilder.set(columnName).to(((ByteArray) null));
        } else {
          mutationBuilder.set(columnName).to(ByteArray.copyFrom(bytes));
        }
        break;
      case ROW:
        @Nullable Row subRow = row.getRow(columnName);
        if (subRow == null) {
          mutationBuilder
              .set(columnName)
              .to(beamTypeToSpannerType(row.getSchema().getField(columnName).getType()), null);
        } else {
          mutationBuilder
              .set(columnName)
              .to(
                  beamTypeToSpannerType(row.getSchema().getField(columnName).getType()),
                  beamRowToStruct(subRow));
        }
        break;
      case ARRAY:
        addIterableToMutationBuilder(
            mutationBuilder, row.getArray(columnName), row.getSchema().getField(columnName));
        break;
      case ITERABLE:
        addIterableToMutationBuilder(
            mutationBuilder, row.getIterable(columnName), row.getSchema().getField(columnName));
        break;
      default:
        throw new IllegalArgumentException(
            String.format("Unsupported field type: %s", fieldType.getTypeName()));
    }
  }