public void initializeRecordWriter()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java [306:331]


    public void initializeRecordWriter(SinkRecord sinkRecord) {
        if (sinkRecord.value() instanceof Map) {
            recordWriterProvider = new JsonRecordWriterProvider();
        } else if ((sinkRecord.valueSchema() != null) && (sinkRecord.valueSchema().type() == Schema.Type.STRUCT)) {
            if (format.equals(IngestionProperties.DataFormat.JSON) || format.equals(IngestionProperties.DataFormat.MULTIJSON)) {
                recordWriterProvider = new JsonRecordWriterProvider();
            } else if (format.equals(IngestionProperties.DataFormat.AVRO)) {
                recordWriterProvider = new AvroRecordWriterProvider();
            } else {
                throw new ConnectException(String.format("Invalid Kusto table mapping, Kafka records of type "
                        + "Avro and JSON can only be ingested to Kusto table having Avro or JSON mapping. "
                        + "Currently, it is of type %s.", format));
            }
        } else if ((sinkRecord.valueSchema() == null) || (sinkRecord.valueSchema().type() == Schema.Type.STRING)) {
            recordWriterProvider = new StringRecordWriterProvider();
        } else if ((sinkRecord.valueSchema() != null) && (sinkRecord.valueSchema().type() == Schema.Type.BYTES)) {
            recordWriterProvider = new ByteRecordWriterProvider();
            if (format.equals(IngestionProperties.DataFormat.AVRO)) {
                shouldWriteAvroAsBytes = true;
            }
        } else {
            throw new ConnectException(String.format(
                    "Invalid Kafka record format, connector does not support %s format. This connector supports Avro, Json with schema, Json without schema, Byte, String format. ",
                    sinkRecord.valueSchema().type()));
        }
    }