in src/main/java/software/amazon/event/kafkaconnector/mapping/SinkRecordJsonMapper.java [25:55]
public String createJsonPayload(SinkRecord sinkRecord) throws IOException {
var root = objectMapper.createObjectNode();
root.put("topic", sinkRecord.topic());
root.put("partition", sinkRecord.kafkaPartition());
root.put("offset", sinkRecord.kafkaOffset());
root.put("timestamp", sinkRecord.timestamp());
root.put("timestampType", sinkRecord.timestampType().toString());
root.set("headers", createHeaderArray(sinkRecord));
if (sinkRecord.key() == null) {
root.set("key", null);
} else {
root.set(
"key",
createJSONFromByteArray(
jsonConverter.fromConnectData(
sinkRecord.topic(), sinkRecord.keySchema(), sinkRecord.key())));
}
// tombstone handling
if (sinkRecord.value() == null) {
root.set("value", null);
} else {
root.set(
"value",
createJSONFromByteArray(
jsonConverter.fromConnectData(
sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value())));
}
return root.toString();
}