in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProvider.java [39:91]
public RecordWriter getRecordWriter(final String filename, OutputStream out) {
try {
log.debug("Opening record writer for: {}", filename);
return new RecordWriter() {
final JsonGenerator writer = mapper.getFactory()
.createGenerator(out)
.setRootValueSeparator(null);
@Override
public void write(SinkRecord record) {
log.trace("Sink record: {}", record);
try {
Object value = record.value();
if (value instanceof Struct) {
byte[] rawJson = converter.fromConnectData(record.topic(), record.valueSchema(), value);
if (ArrayUtils.isEmpty(rawJson)) {
log.warn("Filtering empty records post-serialization. Record filtered {}", record); // prints everything
} else {
out.write(rawJson);
out.write(LINE_SEPARATOR_BYTES);
}
} else {
writer.writeObject(value);
writer.writeRaw(LINE_SEPARATOR);
}
} catch (IOException e) {
throw new ConnectException(e);
}
}
@Override
public void commit() {
try {
writer.flush();
} catch (IOException e) {
throw new DataException(e);
}
}
@Override
public void close() {
try {
writer.close();
out.close();
} catch (IOException e) {
throw new DataException(e);
}
}
};
} catch (IOException e) {
throw new DataException(e);
}
}