public static Map getTopicsToIngestionProps()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [140:179]


    public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(KustoSinkConfig config) {
        Map<String, TopicIngestionProperties> result = new HashMap<>();

        try {
            TopicToTableMapping[] mappings = config.getTopicToTableMapping();

            for (TopicToTableMapping mapping : mappings) {
                IngestionProperties props = new IngestionProperties(mapping.getDb(), mapping.getTable());

                String format = mapping.getFormat();
                if (StringUtils.isNotEmpty(format)) {
                    if (isDataFormatAnyTypeOfJson(format)) {
                        props.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
                    } else {
                        props.setDataFormat(format);
                    }
                }

                String mappingRef = mapping.getMapping();
                if (StringUtils.isNotEmpty(mappingRef) && format != null) {
                    if (isDataFormatAnyTypeOfJson(format)) {
                        props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.JSON);
                    } else if (format.equalsIgnoreCase(IngestionProperties.DataFormat.AVRO.toString())) {
                        props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.AVRO);
                    } else if (format.equalsIgnoreCase(IngestionProperties.DataFormat.APACHEAVRO.toString())) {
                        props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.APACHEAVRO);
                    } else {
                        props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.CSV);
                    }
                }
                TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
                topicIngestionProperties.ingestionProperties = props;
                topicIngestionProperties.streaming = mapping.isStreaming();
                result.put(mapping.getTopic(), topicIngestionProperties);
            }
            return result;
        } catch (Exception ex) {
            throw new ConfigException("Error while parsing kusto ingestion properties.", ex);
        }
    }