in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/RegexpKuduOperationsProducer.java [255:301]
public List<Operation> getOperations(Event event) throws FlumeException {
String raw = new String(event.getBody(), charset);
Matcher m = pattern.matcher(raw);
boolean match = false;
Schema schema = table.getSchema();
List<Operation> ops = Lists.newArrayList();
while (m.find()) {
match = true;
Operation op;
switch (operation) {
case UPSERT:
op = table.newUpsert();
break;
case INSERT:
op = table.newInsert();
break;
default:
throw new FlumeException(
String.format("Unrecognized operation type '%s' in getOperations(): " +
"this should never happen!", operation));
}
PartialRow row = op.getRow();
for (ColumnSchema col : schema.getColumns()) {
try {
coerceAndSet(m.group(col.getName()), col.getName(), col.getType(), row);
} catch (NumberFormatException e) {
String msg = String.format(
"Raw value '%s' couldn't be parsed to type %s for column '%s'",
raw, col.getType(), col.getName());
logOrThrow(badColumnValuePolicy, msg, e);
} catch (IllegalArgumentException e) {
String msg = String.format(
"Column '%s' has no matching group in '%s'",
col.getName(), raw);
logOrThrow(missingColumnPolicy, msg, e);
} catch (Exception e) {
throw new FlumeException("Failed to create Kudu operation", e);
}
}
ops.add(op);
}
if (!match) {
String msg = String.format("Failed to match the pattern '%s' in '%s'", pattern, raw);
logOrThrow(unmatchedRowPolicy, msg, null);
}
return ops;
}