in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [310:343]
void validateTableMappings(KustoSinkConfig config) {
List<String> databaseTableErrorList = new ArrayList<>();
List<String> accessErrorList = new ArrayList<>();
boolean enableTableValidation = config.getEnableTableValidation();
try {
Client engineClient = createKustoEngineClient(config);
if (config.getTopicToTableMapping() != null) {
TopicToTableMapping[] mappings = config.getTopicToTableMapping();
if (enableTableValidation && mappings.length > 0 && (isIngestorRole(mappings[0], engineClient))) {
for (TopicToTableMapping mapping : mappings) {
validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList);
}
}
}
String tableAccessErrorMessage = "";
if (!databaseTableErrorList.isEmpty()) {
tableAccessErrorMessage = "\n\nError occurred while trying to access the following database:table\n" +
String.join("\n", databaseTableErrorList);
}
if (!accessErrorList.isEmpty()) {
tableAccessErrorMessage = tableAccessErrorMessage + "\n\nUser does not have appropriate permissions " +
"to sink data into the Kusto database:table combination(s). " +
"Verify your Kusto principals and roles before proceeding for the following: \n " +
String.join("\n", accessErrorList);
}
if (!tableAccessErrorMessage.isEmpty()) {
throw new ConnectException(tableAccessErrorMessage);
}
} catch (JsonProcessingException e) {
throw new ConnectException("Failed to parse ``kusto.tables.topics.mapping`` configuration.", e);
}
}