public RecordWriter getRecordWriter()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterProvider.java [26:72]


    public RecordWriter getRecordWriter(String filename, OutputStream out) {
        return new RecordWriter() {
            final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>());
            Schema schema;

            @Override
            public void write(SinkRecord record) throws IOException {
                if (schema == null) {
                    schema = record.valueSchema();
                    try {
                        log.debug("Opening record writer for: {}", filename);
                        org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);
                        writer.setFlushOnEveryBlock(true);
                        writer.create(avroSchema, out);
                    } catch (IOException e) {
                        throw new ConnectException(e);
                    }
                }
                Object value = avroData.fromConnectData(schema, record.value());
                // AvroData wraps primitive types so their schema can be included. We need to unwrap
                // NonRecordContainers to just their value to properly handle these types
                if (value instanceof NonRecordContainer) {
                    writer.append(((NonRecordContainer) value).getValue());
                } else {
                    writer.append(value);
                }
            }

            @Override
            public void close() {
                try {
                    writer.close();
                } catch (IOException e) {
                    throw new DataException(e);
                }
            }

            @Override
            public void commit() {
                try {
                    writer.flush();
                } catch (IOException e) {
                    throw new DataException(e);
                }
            }
        };
    }