in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterProvider.java [26:72]
public RecordWriter getRecordWriter(String filename, OutputStream out) {
return new RecordWriter() {
final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>());
Schema schema;
@Override
public void write(SinkRecord record) throws IOException {
if (schema == null) {
schema = record.valueSchema();
try {
log.debug("Opening record writer for: {}", filename);
org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);
writer.setFlushOnEveryBlock(true);
writer.create(avroSchema, out);
} catch (IOException e) {
throw new ConnectException(e);
}
}
Object value = avroData.fromConnectData(schema, record.value());
// AvroData wraps primitive types so their schema can be included. We need to unwrap
// NonRecordContainers to just their value to properly handle these types
if (value instanceof NonRecordContainer) {
writer.append(((NonRecordContainer) value).getValue());
} else {
writer.append(value);
}
}
@Override
public void close() {
try {
writer.close();
} catch (IOException e) {
throw new DataException(e);
}
}
@Override
public void commit() {
try {
writer.flush();
} catch (IOException e) {
throw new DataException(e);
}
}
};
}