in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKeyedKuduOperationsProducer.java [103:130]
public List<Operation> getOperations(Event event) throws FlumeException {
String key = event.getHeaders().get(keyColumn);
if (key == null) {
throw new FlumeException(
String.format("No value provided for key column %s", keyColumn));
}
try {
Operation op;
switch (operation.toLowerCase(Locale.ENGLISH)) {
case "upsert":
op = table.newUpsert();
break;
case "insert":
op = table.newInsert();
break;
default:
throw new FlumeException(
String.format("Unexpected operation %s", operation));
}
PartialRow row = op.getRow();
row.addString(keyColumn, key);
row.addBinary(payloadColumn, event.getBody());
return Collections.singletonList(op);
} catch (Exception e) {
throw new FlumeException("Failed to create Kudu Operation object", e);
}
}