in flume-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java [80:113]
public void configure(Context context) {
String morphlineFile = context.getString(MORPHLINE_FILE_PARAM);
String morphlineId = context.getString(MORPHLINE_ID_PARAM);
if (morphlineFile == null || morphlineFile.trim().length() == 0) {
throw new MorphlineCompilationException("Missing parameter: " + MORPHLINE_FILE_PARAM, null);
}
morphlineFileAndId = morphlineFile + "@" + morphlineId;
if (morphlineContext == null) {
FaultTolerance faultTolerance = new FaultTolerance(
context.getBoolean(FaultTolerance.IS_PRODUCTION_MODE, false),
context.getBoolean(FaultTolerance.IS_IGNORING_RECOVERABLE_EXCEPTIONS, false),
context.getString(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES));
morphlineContext = new MorphlineContext.Builder()
.setExceptionHandler(faultTolerance)
.setMetricRegistry(SharedMetricRegistries.getOrCreate(morphlineFileAndId))
.build();
}
Config override = ConfigFactory.parseMap(
context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
morphline = new Compiler().compile(
new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);
this.mappingTimer = morphlineContext.getMetricRegistry().timer(
MetricRegistry.name("morphline.app", Metrics.ELAPSED_TIME));
this.numRecords = morphlineContext.getMetricRegistry().meter(
MetricRegistry.name("morphline.app", Metrics.NUM_RECORDS));
this.numFailedRecords = morphlineContext.getMetricRegistry().meter(
MetricRegistry.name("morphline.app", "numFailedRecords"));
this.numExceptionRecords = morphlineContext.getMetricRegistry().meter(
MetricRegistry.name("morphline.app", "numExceptionRecords"));
}