in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java [185:205]
public void sendFailedRecordToDlq(SinkRecord sinkRecord) {
byte[] recordKey = String.format("Failed to write sinkRecord to KustoDB with the following kafka coordinates, "
+ "topic=%s, partition=%s, offset=%s.",
sinkRecord.topic(),
sinkRecord.kafkaPartition(),
sinkRecord.kafkaOffset()).getBytes(StandardCharsets.UTF_8);
byte[] recordValue = sinkRecord.value().toString().getBytes(StandardCharsets.UTF_8);
ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(dlqTopicName, recordKey, recordValue);
try {
dlqProducer.send(dlqRecord, (recordMetadata, exception) -> {
if (exception != null) {
throw new KafkaException(
String.format("Failed to write records to miscellaneous dead-letter queue topic=%s.", dlqTopicName),
exception);
}
});
} catch (IllegalStateException e) {
log.error("Failed to write records to miscellaneous dead-letter queue topic, "
+ "kafka producer has already been closed. Exception={0}", e);
}
}