public MutationDataBuilder getRecordMutationDataBuilder()

in kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/ValueMapper.java [93:158]


  public MutationDataBuilder getRecordMutationDataBuilder(
      SchemaAndValue kafkaValueAndSchema, String topic, long timestampMicros) {
    Object rootKafkaValue = kafkaValueAndSchema.value();
    Optional<Schema> rootKafkaSchema = Optional.ofNullable(kafkaValueAndSchema.schema());
    logIfLogicalTypeUnsupported(rootKafkaSchema);
    MutationDataBuilder mutationDataBuilder = createMutationDataBuilder();
    if (rootKafkaValue == null && nullMode == NullValueMode.IGNORE) {
      // Do nothing
    } else if (rootKafkaValue == null && nullMode == NullValueMode.DELETE) {
      mutationDataBuilder.deleteRow();
    } else if (rootKafkaValue instanceof Struct) {
      for (Map.Entry<Object, SchemaAndValue> field :
          getChildren((Struct) rootKafkaValue, rootKafkaSchema)) {
        String kafkaFieldName = field.getKey().toString();
        Object kafkaFieldValue = field.getValue().value();
        Optional<Schema> kafkaFieldSchema = Optional.ofNullable(field.getValue().schema());
        logIfLogicalTypeUnsupported(kafkaFieldSchema);
        if (kafkaFieldValue == null && nullMode == NullValueMode.IGNORE) {
          continue;
        } else if (kafkaFieldValue == null && nullMode == NullValueMode.DELETE) {
          mutationDataBuilder.deleteFamily(kafkaFieldName);
        } else if (kafkaFieldValue instanceof Struct) {
          for (Map.Entry<Object, SchemaAndValue> subfield :
              getChildren((Struct) kafkaFieldValue, kafkaFieldSchema)) {
            ByteString kafkaSubfieldName =
                ByteString.copyFrom(subfield.getKey().toString().getBytes(StandardCharsets.UTF_8));
            Object kafkaSubfieldValue = subfield.getValue().value();
            Optional<Schema> kafkaSubfieldSchema =
                Optional.ofNullable(subfield.getValue().schema());
            logIfLogicalTypeUnsupported(kafkaSubfieldSchema);
            if (kafkaSubfieldValue == null && nullMode == NullValueMode.IGNORE) {
              continue;
            } else if (kafkaSubfieldValue == null && nullMode == NullValueMode.DELETE) {
              mutationDataBuilder.deleteCells(
                  kafkaFieldName,
                  kafkaSubfieldName,
                  Range.TimestampRange.create(0, timestampMicros));
            } else {
              mutationDataBuilder.setCell(
                  kafkaFieldName,
                  kafkaSubfieldName,
                  timestampMicros,
                  ByteString.copyFrom(serialize(kafkaSubfieldValue, kafkaSubfieldSchema)));
            }
          }
        } else {
          if (defaultColumnFamilyTemplate != null) {
            mutationDataBuilder.setCell(
                getDefaultColumnFamily(topic),
                ByteString.copyFrom(kafkaFieldName.getBytes(StandardCharsets.UTF_8)),
                timestampMicros,
                ByteString.copyFrom(serialize(kafkaFieldValue, kafkaFieldSchema)));
          }
        }
      }
    } else {
      if (defaultColumnFamilyTemplate != null && defaultColumnQualifier != null) {
        mutationDataBuilder.setCell(
            getDefaultColumnFamily(topic),
            defaultColumnQualifier,
            timestampMicros,
            ByteString.copyFrom(serialize(rootKafkaValue, rootKafkaSchema)));
      }
    }
    return mutationDataBuilder;
  }