in components/camel-directvm/src/main/java/org/apache/camel/karaf/component/directvm/DirectVmProducer.java [42:101]
public boolean process(Exchange exchange, AsyncCallback callback) {
// send to consumer
DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint);
if (consumer == null) {
if (endpoint.isFailIfNoConsumers()) {
exchange.setException(new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange));
} else {
LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint);
}
callback.done(true);
return true;
}
try {
final HeaderFilterStrategy headerFilterStrategy = endpoint.getHeaderFilterStrategy();
// only clone the Exchange if we actually need to filter out properties or headers
final Exchange submitted = (!endpoint.isPropagateProperties() || headerFilterStrategy != null) ? exchange.copy() : exchange;
// clear properties in the copy if we are not propagating them
if (!endpoint.isPropagateProperties()) {
submitted.getProperties().clear();
}
// filter headers by Header Filter Strategy if there is one set
if (headerFilterStrategy != null) {
submitted.getIn().getHeaders().entrySet().removeIf(e -> headerFilterStrategy.applyFilterToCamelHeaders(e.getKey(), e.getValue(), submitted));
}
return consumer.getAsyncProcessor().process(submitted, done -> {
try {
Message msg = submitted.getMessage();
if (headerFilterStrategy != null) {
msg.getHeaders().entrySet().removeIf(e -> headerFilterStrategy.applyFilterToExternalHeaders(e.getKey(), e.getValue(), submitted));
}
if (exchange != submitted) {
// only need to copy back if they are different
exchange.setException(submitted.getException());
exchange.getOut().copyFrom(msg);
}
if (endpoint.isPropagateProperties()) {
exchange.getProperties().putAll(submitted.getProperties());
}
} catch (Exception e) {
exchange.setException(e);
} finally {
callback.done(true);
}
});
} catch (Exception e) {
exchange.setException(e);
}
callback.done(true);
return true;
}