in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java [306:331]
public void initializeRecordWriter(SinkRecord sinkRecord) {
if (sinkRecord.value() instanceof Map) {
recordWriterProvider = new JsonRecordWriterProvider();
} else if ((sinkRecord.valueSchema() != null) && (sinkRecord.valueSchema().type() == Schema.Type.STRUCT)) {
if (format.equals(IngestionProperties.DataFormat.JSON) || format.equals(IngestionProperties.DataFormat.MULTIJSON)) {
recordWriterProvider = new JsonRecordWriterProvider();
} else if (format.equals(IngestionProperties.DataFormat.AVRO)) {
recordWriterProvider = new AvroRecordWriterProvider();
} else {
throw new ConnectException(String.format("Invalid Kusto table mapping, Kafka records of type "
+ "Avro and JSON can only be ingested to Kusto table having Avro or JSON mapping. "
+ "Currently, it is of type %s.", format));
}
} else if ((sinkRecord.valueSchema() == null) || (sinkRecord.valueSchema().type() == Schema.Type.STRING)) {
recordWriterProvider = new StringRecordWriterProvider();
} else if ((sinkRecord.valueSchema() != null) && (sinkRecord.valueSchema().type() == Schema.Type.BYTES)) {
recordWriterProvider = new ByteRecordWriterProvider();
if (format.equals(IngestionProperties.DataFormat.AVRO)) {
shouldWriteAvroAsBytes = true;
}
} else {
throw new ConnectException(String.format(
"Invalid Kafka record format, connector does not support %s format. This connector supports Avro, Json with schema, Json without schema, Byte, String format. ",
sinkRecord.valueSchema().type()));
}
}