public void sendFailedRecordToDlq()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java [185:205]


    public void sendFailedRecordToDlq(SinkRecord sinkRecord) {
        byte[] recordKey = String.format("Failed to write sinkRecord to KustoDB with the following kafka coordinates, "
                + "topic=%s, partition=%s, offset=%s.",
                sinkRecord.topic(),
                sinkRecord.kafkaPartition(),
                sinkRecord.kafkaOffset()).getBytes(StandardCharsets.UTF_8);
        byte[] recordValue = sinkRecord.value().toString().getBytes(StandardCharsets.UTF_8);
        ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(dlqTopicName, recordKey, recordValue);
        try {
            dlqProducer.send(dlqRecord, (recordMetadata, exception) -> {
                if (exception != null) {
                    throw new KafkaException(
                            String.format("Failed to write records to miscellaneous dead-letter queue topic=%s.", dlqTopicName),
                            exception);
                }
            });
        } catch (IllegalStateException e) {
            log.error("Failed to write records to miscellaneous dead-letter queue topic, "
                    + "kafka producer has already been closed. Exception={0}", e);
        }
    }