in flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java [119:156]
private void initialize(Event event) throws IOException {
Schema schema = null;
String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
if (schemaUrl != null) { // if URL_HEADER is there then use it
schema = schemaCache.get(schemaUrl);
if (schema == null) {
schema = loadFromUrl(schemaUrl);
schemaCache.put(schemaUrl, schema);
}
} else if (schemaString != null) { // fallback to LITERAL_HEADER if it was there
schema = new Schema.Parser().parse(schemaString);
} else if (staticSchemaURL != null) { // fallback to static url if it was there
schema = schemaCache.get(staticSchemaURL);
if (schema == null) {
schema = loadFromUrl(staticSchemaURL);
schemaCache.put(staticSchemaURL, schema);
}
} else { // no other options so giving up
throw new FlumeException("Could not find schema for event " + event);
}
writer = new GenericDatumWriter<Object>(schema);
dataFileWriter = new DataFileWriter<Object>(writer);
dataFileWriter.setSyncInterval(syncIntervalBytes);
try {
CodecFactory codecFactory = CodecFactory.fromString(compressionCodec);
dataFileWriter.setCodec(codecFactory);
} catch (AvroRuntimeException e) {
logger.warn("Unable to instantiate avro codec with name (" +
compressionCodec + "). Compression disabled. Exception follows.", e);
}
dataFileWriter.create(schema, out);
}