in modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java [41:75]
public void init(Context context) throws Exception {
queueId = context.getObserverConfiguration().getString("queueId");
ExportQueue.Options opts = new ExportQueue.Options(queueId, context.getAppConfiguration());
// TODO defer loading classes... so that not done during fluo init
// TODO move class loading to centralized place... also attempt to check type params
@SuppressWarnings("rawtypes")
Exporter exporter = getClass().getClassLoader().loadClass(opts.fluentCfg.exporterType)
.asSubclass(Exporter.class).newInstance();
SimpleSerializer serializer = SimpleSerializer.getInstance(context.getAppConfiguration());
exporter.init(new Exporter.Context() {
@Override
public String getQueueId() {
return queueId;
}
@Override
public SimpleConfiguration getExporterConfiguration() {
return opts.getExporterConfiguration();
}
@Override
public Context getObserverContext() {
return context;
}
});
this.eoi =
new ExportObserverImpl<K, V>(queueId, opts.fluentCfg, serializer, exporter::processExports);
}