in kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/ValueMapper.java [93:158]
public MutationDataBuilder getRecordMutationDataBuilder(
SchemaAndValue kafkaValueAndSchema, String topic, long timestampMicros) {
Object rootKafkaValue = kafkaValueAndSchema.value();
Optional<Schema> rootKafkaSchema = Optional.ofNullable(kafkaValueAndSchema.schema());
logIfLogicalTypeUnsupported(rootKafkaSchema);
MutationDataBuilder mutationDataBuilder = createMutationDataBuilder();
if (rootKafkaValue == null && nullMode == NullValueMode.IGNORE) {
// Do nothing
} else if (rootKafkaValue == null && nullMode == NullValueMode.DELETE) {
mutationDataBuilder.deleteRow();
} else if (rootKafkaValue instanceof Struct) {
for (Map.Entry<Object, SchemaAndValue> field :
getChildren((Struct) rootKafkaValue, rootKafkaSchema)) {
String kafkaFieldName = field.getKey().toString();
Object kafkaFieldValue = field.getValue().value();
Optional<Schema> kafkaFieldSchema = Optional.ofNullable(field.getValue().schema());
logIfLogicalTypeUnsupported(kafkaFieldSchema);
if (kafkaFieldValue == null && nullMode == NullValueMode.IGNORE) {
continue;
} else if (kafkaFieldValue == null && nullMode == NullValueMode.DELETE) {
mutationDataBuilder.deleteFamily(kafkaFieldName);
} else if (kafkaFieldValue instanceof Struct) {
for (Map.Entry<Object, SchemaAndValue> subfield :
getChildren((Struct) kafkaFieldValue, kafkaFieldSchema)) {
ByteString kafkaSubfieldName =
ByteString.copyFrom(subfield.getKey().toString().getBytes(StandardCharsets.UTF_8));
Object kafkaSubfieldValue = subfield.getValue().value();
Optional<Schema> kafkaSubfieldSchema =
Optional.ofNullable(subfield.getValue().schema());
logIfLogicalTypeUnsupported(kafkaSubfieldSchema);
if (kafkaSubfieldValue == null && nullMode == NullValueMode.IGNORE) {
continue;
} else if (kafkaSubfieldValue == null && nullMode == NullValueMode.DELETE) {
mutationDataBuilder.deleteCells(
kafkaFieldName,
kafkaSubfieldName,
Range.TimestampRange.create(0, timestampMicros));
} else {
mutationDataBuilder.setCell(
kafkaFieldName,
kafkaSubfieldName,
timestampMicros,
ByteString.copyFrom(serialize(kafkaSubfieldValue, kafkaSubfieldSchema)));
}
}
} else {
if (defaultColumnFamilyTemplate != null) {
mutationDataBuilder.setCell(
getDefaultColumnFamily(topic),
ByteString.copyFrom(kafkaFieldName.getBytes(StandardCharsets.UTF_8)),
timestampMicros,
ByteString.copyFrom(serialize(kafkaFieldValue, kafkaFieldSchema)));
}
}
}
} else {
if (defaultColumnFamilyTemplate != null && defaultColumnQualifier != null) {
mutationDataBuilder.setCell(
getDefaultColumnFamily(topic),
defaultColumnQualifier,
timestampMicros,
ByteString.copyFrom(serialize(rootKafkaValue, rootKafkaSchema)));
}
}
return mutationDataBuilder;
}