public RecordWriter getRecordWriter()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProvider.java [39:91]


    public RecordWriter getRecordWriter(final String filename, OutputStream out) {
        try {
            log.debug("Opening record writer for: {}", filename);
            return new RecordWriter() {
                final JsonGenerator writer = mapper.getFactory()
                        .createGenerator(out)
                        .setRootValueSeparator(null);

                @Override
                public void write(SinkRecord record) {
                    log.trace("Sink record: {}", record);
                    try {
                        Object value = record.value();
                        if (value instanceof Struct) {
                            byte[] rawJson = converter.fromConnectData(record.topic(), record.valueSchema(), value);
                            if (ArrayUtils.isEmpty(rawJson)) {
                                log.warn("Filtering empty records post-serialization. Record filtered {}", record); // prints everything
                            } else {
                                out.write(rawJson);
                                out.write(LINE_SEPARATOR_BYTES);
                            }
                        } else {
                            writer.writeObject(value);
                            writer.writeRaw(LINE_SEPARATOR);
                        }
                    } catch (IOException e) {
                        throw new ConnectException(e);
                    }
                }

                @Override
                public void commit() {
                    try {
                        writer.flush();
                    } catch (IOException e) {
                        throw new DataException(e);
                    }
                }

                @Override
                public void close() {
                    try {
                        writer.close();
                        out.close();
                    } catch (IOException e) {
                        throw new DataException(e);
                    }
                }
            };
        } catch (IOException e) {
            throw new DataException(e);
        }
    }