in processor/camel/src/main/java/org/apache/karaf/decanter/processor/camel/CamelProcessor.java [70:104]
public void activate(Dictionary<String, Object> configuration, BundleContext bundleContext) throws Exception {
targetTopics = (configuration.get("target.topics") != null) ? configuration.get("target.topics").toString() : "decanter/process/camel";
callbackUri = (configuration.get("callback.uri") != null) ? configuration.get("callback.uri").toString() : "direct-vm:decanter-callback";
delegateUri = (configuration.get("delegate.uri") != null) ? configuration.get("delegate.uri").toString() : "direct-vm:decanter-delegate";
if (bundleContext != null) {
OsgiDefaultCamelContext osgiCamelContext = new OsgiDefaultCamelContext(bundleContext);
osgiCamelContext.setClassResolver(new OsgiClassResolver(camelContext, bundleContext));
osgiCamelContext.setDataFormatResolver(new OsgiDataFormatResolver(bundleContext));
osgiCamelContext.setLanguageResolver(new OsgiLanguageResolver(bundleContext));
camelContext = osgiCamelContext;
serviceRegistration = bundleContext.registerService(CamelContext.class, camelContext, null);
} else {
camelContext = new DefaultCamelContext();
}
camelContext.start();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from(callbackUri)
.id("decanter-processor-callback")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> body = exchange.getIn().getBody(Map.class);
body.put("processor", "camel");
String[] topics = targetTopics.split(",");
for (String topic : topics) {
dispatcher.postEvent(new Event(topic, body));
}
}
}).end();
}
});
}