in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java [254:277]
private Schema getSchema(Event event) throws FlumeException {
Map<String, String> headers = event.getHeaders();
String schemaUrl = headers.get(SCHEMA_URL_HEADER);
String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER);
try {
if (schemaUrl != null) {
return schemasFromURL.get(schemaUrl);
} else if (schemaLiteral != null) {
return schemasFromLiteral.get(schemaLiteral);
} else if (defaultSchemaUrl != null) {
return schemasFromURL.get(defaultSchemaUrl);
} else {
throw new FlumeException(
String.format("No schema for event. " +
"Specify configuration property '%s' or event header '%s'",
SCHEMA_PROP,
SCHEMA_URL_HEADER));
}
} catch (ExecutionException e) {
throw new FlumeException("Cannot get schema", e);
} catch (RuntimeException e) {
throw new FlumeException("Cannot parse schema", e);
}
}