in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [363:379]
public void open(Collection<TopicPartition> partitions) {
assignment.addAll(partitions);
for (TopicPartition tp : assignment) {
TopicIngestionProperties ingestionProps = getIngestionProps(tp.topic());
log.debug("Open Kusto topic: '{}' with partition: '{}'", tp.topic(), tp.partition());
if (ingestionProps == null) {
throw new ConnectException(String.format("Kusto Sink has no ingestion props mapped " +
"for the topic: %s. please check your configuration.", tp.topic()));
} else {
IngestClient client = ingestionProps.streaming ? streamingIngestClient : kustoIngestClient;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, client, ingestionProps, config, isDlqEnabled,
dlqTopicName, dlqProducer);
writer.open();
writers.put(tp, writer);
}
}
}