in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [140:179]
public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(KustoSinkConfig config) {
Map<String, TopicIngestionProperties> result = new HashMap<>();
try {
TopicToTableMapping[] mappings = config.getTopicToTableMapping();
for (TopicToTableMapping mapping : mappings) {
IngestionProperties props = new IngestionProperties(mapping.getDb(), mapping.getTable());
String format = mapping.getFormat();
if (StringUtils.isNotEmpty(format)) {
if (isDataFormatAnyTypeOfJson(format)) {
props.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
} else {
props.setDataFormat(format);
}
}
String mappingRef = mapping.getMapping();
if (StringUtils.isNotEmpty(mappingRef) && format != null) {
if (isDataFormatAnyTypeOfJson(format)) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.JSON);
} else if (format.equalsIgnoreCase(IngestionProperties.DataFormat.AVRO.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.AVRO);
} else if (format.equalsIgnoreCase(IngestionProperties.DataFormat.APACHEAVRO.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.APACHEAVRO);
} else {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.CSV);
}
}
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
topicIngestionProperties.ingestionProperties = props;
topicIngestionProperties.streaming = mapping.isStreaming();
result.put(mapping.getTopic(), topicIngestionProperties);
}
return result;
} catch (Exception ex) {
throw new ConfigException("Error while parsing kusto ingestion properties.", ex);
}
}