public Mutation apply()

in v1/src/main/java/com/google/cloud/teleport/spanner/AvroRecordConverter.java [47:234]


  public Mutation apply(GenericRecord record) {
    Schema schema = record.getSchema();
    List<Schema.Field> fields = schema.getFields();
    Mutation.WriteBuilder builder = Mutation.newInsertOrUpdateBuilder(table.name());

    for (Schema.Field field : fields) {
      String fieldName = field.name();

      Column column = table.column(fieldName);
      if (column == null) {
        throw new IllegalArgumentException(
            String.format(
                "Cannot find corresponding column for field %s in table %s schema %s",
                fieldName, table.prettyPrint(), schema.toString(true)));
      }

      if (column.isGenerated()) {
        // Spanner will compute generated column values automatically.
        continue;
      }

      Schema avroFieldSchema = field.schema();
      if (avroFieldSchema.getType() == Schema.Type.UNION) {
        Schema unpacked = AvroUtil.unpackNullable(avroFieldSchema);
        if (unpacked != null) {
          avroFieldSchema = unpacked;
        }
      }
      LogicalType logicalType = LogicalTypes.fromSchema(avroFieldSchema);
      Schema.Type avroType = avroFieldSchema.getType();

      switch (column.type().getCode()) {
        case BOOL:
        case PG_BOOL:
          builder.set(column.name()).to(readBool(record, avroType, fieldName).orElse(null));
          break;
        case INT64:
        case PG_INT8:
        case ENUM:
          builder.set(column.name()).to(readInt64(record, avroType, fieldName).orElse(null));
          break;
        case FLOAT32:
        case PG_FLOAT4:
          builder.set(column.name()).to(readFloat32(record, avroType, fieldName).orElse(null));
          break;
        case FLOAT64:
        case PG_FLOAT8:
          builder.set(column.name()).to(readFloat64(record, avroType, fieldName).orElse(null));
          break;
        case STRING:
        case PG_VARCHAR:
        case PG_TEXT:
        case JSON:
        case PG_JSONB:
        case UUID:
        case PG_UUID:
          builder.set(column.name()).to(readString(record, avroType, fieldName).orElse(null));
          break;
        case BYTES:
        case PG_BYTEA:
        case PROTO:
          builder.set(column.name()).to(readBytes(record, avroType, fieldName).orElse(null));
          break;
        case TIMESTAMP:
        case PG_TIMESTAMPTZ:
        case PG_SPANNER_COMMIT_TIMESTAMP:
          builder
              .set(column.name())
              .to(readTimestamp(record, avroType, logicalType, fieldName).orElse(null));
          break;
        case DATE:
        case PG_DATE:
          builder
              .set(column.name())
              .to(readDate(record, avroType, logicalType, fieldName).orElse(null));
          break;
        case NUMERIC:
          builder.set(column.name()).to(readNumeric(record, avroType, fieldName).orElse(null));
          break;
        case PG_NUMERIC:
          builder
              .set(column.name())
              .to(Value.pgNumeric(readPgNumeric(record, avroType, fieldName).orElse(null)));
          break;
        case ARRAY:
        case PG_ARRAY:
          {
            Schema arraySchema = avroFieldSchema.getElementType();
            if (arraySchema.getType() == Schema.Type.UNION) {
              Schema unpacked = AvroUtil.unpackNullable(arraySchema);
              if (unpacked != null) {
                arraySchema = unpacked;
              }
            }
            LogicalType arrayLogicalType = LogicalTypes.fromSchema(arraySchema);
            Schema.Type arrayType = arraySchema.getType();
            switch (column.type().getArrayElementType().getCode()) {
              case BOOL:
              case PG_BOOL:
                builder
                    .set(column.name())
                    .toBoolArray(readBoolArray(record, arrayType, fieldName).orElse(null));
                break;
              case INT64:
              case PG_INT8:
              case ENUM:
                builder
                    .set(column.name())
                    .toInt64Array(readInt64Array(record, arrayType, fieldName).orElse(null));
                break;
              case FLOAT32:
              case PG_FLOAT4:
                builder
                    .set(column.name())
                    .toFloat32Array(readFloat32Array(record, arrayType, fieldName).orElse(null));
                break;
              case FLOAT64:
              case PG_FLOAT8:
                builder
                    .set(column.name())
                    .toFloat64Array(readFloat64Array(record, arrayType, fieldName).orElse(null));
                break;
              case STRING:
              case PG_VARCHAR:
              case PG_TEXT:
              case JSON:
              case UUID:
              case PG_UUID:
                builder
                    .set(column.name())
                    .toStringArray(readStringArray(record, arrayType, fieldName).orElse(null));
                break;
              case PG_JSONB:
                builder
                    .set(column.name())
                    .toPgJsonbArray(readStringArray(record, arrayType, fieldName).orElse(null));
                break;
              case BYTES:
              case PG_BYTEA:
              case PROTO:
                builder
                    .set(column.name())
                    .toBytesArray(readBytesArray(record, arrayType, fieldName).orElse(null));
                break;
              case TIMESTAMP:
              case PG_TIMESTAMPTZ:
              case PG_SPANNER_COMMIT_TIMESTAMP:
                builder
                    .set(column.name())
                    .toTimestampArray(
                        readTimestampArray(record, arrayType, arrayLogicalType, fieldName)
                            .orElse(null));
                break;
              case DATE:
              case PG_DATE:
                builder
                    .set(column.name())
                    .toDateArray(readDateArray(record, arrayType, fieldName).orElse(null));
                break;
              case NUMERIC:
                builder
                    .set(column.name())
                    .toStringArray(readNumericArray(record, arrayType, fieldName).orElse(null));
                break;
              case PG_NUMERIC:
                builder
                    .set(column.name())
                    .toPgNumericArray(
                        readPgNumericArray(record, arrayType, fieldName).orElse(null));
                break;
              default:
                throw new IllegalArgumentException(
                    String.format(
                        "Cannot convert field %s in schema %s table %s",
                        fieldName, schema.toString(true), table.prettyPrint()));
            }
            break;
          }
        default:
          throw new IllegalArgumentException(
              String.format(
                  "Cannot convert field %s in schema %s table %s",
                  fieldName, schema.toString(true), table.prettyPrint()));
      }
    }

    return builder.build();
  }