public RecordWriter getRecordWriter()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/ByteRecordWriterProvider.java [19:57]


    public RecordWriter getRecordWriter(String filename, OutputStream out) {
        return new RecordWriter() {

            @Override
            public void write(SinkRecord record) throws IOException {
                byte[] value = null;
                byte[] valueBytes = (byte[]) record.value();
                if (filename.contains("avro")) {
                    value = new byte[valueBytes.length];
                    System.arraycopy(valueBytes, 0, value, 0, valueBytes.length);
                } else {
                    byte[] separator = "\n".getBytes(StandardCharsets.UTF_8);
                    byte[] valueWithSeparator = new byte[valueBytes.length + separator.length];
                    System.arraycopy(valueBytes, 0, valueWithSeparator, 0, valueBytes.length);
                    System.arraycopy(separator, 0, valueWithSeparator, valueBytes.length, separator.length);
                    value = valueWithSeparator;
                }
                out.write(value);
            }

            @Override
            public void close() {
                try {
                    out.close();
                } catch (IOException e) {
                    throw new DataException(e);
                }
            }

            @Override
            public void commit() {
                try {
                    out.flush();
                } catch (IOException e) {
                    throw new DataException(e);
                }
            }
        };
    }