in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java [197:252]
private void setupOp(Operation op, GenericRecord record) {
PartialRow row = op.getRow();
for (ColumnSchema col : table.getSchema().getColumns()) {
String name = col.getName();
Object value = record.get(name);
if (value == null) {
// Set null if nullable, otherwise leave unset for possible Kudu default.
if (col.isNullable()) {
row.setNull(name);
}
} else {
// Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as
// a larger type.
try {
switch (col.getType()) {
case BOOL:
row.addBoolean(name, (boolean) value);
break;
case INT8:
row.addByte(name, (byte) value);
break;
case INT16:
row.addShort(name, (short) value);
break;
case INT32:
row.addInt(name, (int) value);
break;
case INT64: // Fall through
case UNIXTIME_MICROS:
row.addLong(name, (long) value);
break;
case FLOAT:
row.addFloat(name, (float) value);
break;
case DOUBLE:
row.addDouble(name, (double) value);
break;
case STRING:
row.addString(name, value.toString());
break;
case BINARY:
row.addBinary(name, (byte[]) value);
break;
default:
throw new FlumeException(String.format(
"Unrecognized type %s for column %s", col.getType().toString(), name));
}
} catch (ClassCastException e) {
throw new FlumeException(
String.format("Failed to coerce value for column '%s' to type %s",
col.getName(),
col.getType()), e);
}
}
}
}