in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [404:438]
public void start(Map<String, String> props) {
config = new KustoSinkConfig(props);
String url = config.getKustoIngestUrl();
validateTableMappings(config);
if (config.isDlqEnabled()) {
isDlqEnabled = true;
dlqTopicName = config.getDlqTopicName();
Properties properties = config.getDlqProps();
log.info("Initializing miscellaneous dead-letter queue producer with the following properties: {}",
properties.keySet());
try {
dlqProducer = new KafkaProducer<>(properties);
} catch (Exception e) {
throw new ConnectException("Failed to initialize producer for miscellaneous dead-letter queue", e);
}
} else {
dlqProducer = null;
isDlqEnabled = false;
dlqTopicName = null;
}
topicsToIngestionProps = getTopicsToIngestionProps(config);
// this should be read properly from settings
createKustoIngestClient(config);
log.info("Started KustoSinkTask with target cluster: ({}), source topics: ({})", url,
topicsToIngestionProps.keySet());
// Adding this check to make code testable
if (context != null) {
open(context.assignment());
}
}