public String createJsonPayload()

in src/main/java/software/amazon/event/kafkaconnector/mapping/SinkRecordJsonMapper.java [25:55]


  public String createJsonPayload(SinkRecord sinkRecord) throws IOException {
    var root = objectMapper.createObjectNode();
    root.put("topic", sinkRecord.topic());
    root.put("partition", sinkRecord.kafkaPartition());
    root.put("offset", sinkRecord.kafkaOffset());
    root.put("timestamp", sinkRecord.timestamp());
    root.put("timestampType", sinkRecord.timestampType().toString());
    root.set("headers", createHeaderArray(sinkRecord));

    if (sinkRecord.key() == null) {
      root.set("key", null);
    } else {
      root.set(
          "key",
          createJSONFromByteArray(
              jsonConverter.fromConnectData(
                  sinkRecord.topic(), sinkRecord.keySchema(), sinkRecord.key())));
    }

    // tombstone handling
    if (sinkRecord.value() == null) {
      root.set("value", null);
    } else {
      root.set(
          "value",
          createJSONFromByteArray(
              jsonConverter.fromConnectData(
                  sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value())));
    }
    return root.toString();
  }