in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java [173:195]
public List<Operation> getOperations(Event event) throws FlumeException {
Schema schema = getSchema(event);
DatumReader<GenericRecord> reader = readers.getUnchecked(schema);
decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
try {
reuse = reader.read(reuse, decoder);
} catch (IOException e) {
throw new FlumeException("Cannot deserialize event", e);
}
Operation op;
switch (operation.toLowerCase(Locale.ENGLISH)) {
case "upsert":
op = table.newUpsert();
break;
case "insert":
op = table.newInsert();
break;
default:
throw new FlumeException(String.format("Unexpected operation %s", operation));
}
setupOp(op, reuse);
return Collections.singletonList(op);
}