in core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java [247:445]
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
preCheckPoll(exchange);
} catch (Exception e) {
exchange.setException(new CamelExchangeException("Error during pre poll check", exchange, e));
callback.done(true);
return true;
}
// which consumer to use
PollingConsumer consumer;
Endpoint endpoint;
// use dynamic endpoint so calculate the endpoint to use
Object recipient = null;
String staticUri = null;
boolean prototype = cacheSize < 0;
try {
recipient = expression.evaluate(exchange, Object.class);
if (dynamicAware != null) {
// if its the same scheme as the pre-resolved dynamic aware then we can optimise to use it
String originalUri = uri;
String uri = resolveUri(exchange, recipient);
String scheme = resolveScheme(exchange, uri);
if (dynamicAware.getScheme().equals(scheme)) {
PollDynamicAware.DynamicAwareEntry entry = dynamicAware.prepare(exchange, uri, originalUri);
if (entry != null) {
staticUri = dynamicAware.resolveStaticUri(exchange, entry);
if (staticUri != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Optimising poll via PollDynamicAware component: {} to use static uri: {}", scheme,
URISupport.sanitizeUri(staticUri));
}
}
}
}
}
Object targetRecipient = staticUri != null ? staticUri : recipient;
targetRecipient = prepareRecipient(exchange, targetRecipient);
if (targetRecipient == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Poll dynamic evaluated as null so cannot poll from any endpoint");
}
// no endpoint to send to, so ignore
callback.done(true);
return true;
}
Endpoint existing = getExistingEndpoint(exchange, targetRecipient);
if (existing == null) {
endpoint = resolveEndpoint(exchange, targetRecipient, prototype);
} else {
endpoint = existing;
// we have an existing endpoint then its not a prototype scope
prototype = false;
}
// acquire the consumer from the cache
consumer = consumerCache.acquirePollingConsumer(endpoint);
} catch (Exception e) {
if (isIgnoreInvalidEndpoint()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e);
}
} else {
exchange.setException(e);
}
callback.done(true);
return true;
}
// grab the real delegate consumer that performs the actual polling
final boolean bridgeErrorHandler = isBridgeErrorHandler(consumer);
DynamicPollingConsumer dynamicConsumer = null;
if (consumer instanceof DynamicPollingConsumer dyn) {
dynamicConsumer = dyn;
}
Exchange resourceExchange;
try {
if (timeout < 0) {
LOG.debug("Consumer receive: {}", consumer);
resourceExchange = dynamicConsumer != null ? dynamicConsumer.receive(exchange) : consumer.receive();
} else if (timeout == 0) {
LOG.debug("Consumer receiveNoWait: {}", consumer);
resourceExchange = dynamicConsumer != null ? dynamicConsumer.receiveNoWait(exchange) : consumer.receiveNoWait();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
}
resourceExchange
= dynamicConsumer != null ? dynamicConsumer.receive(exchange, timeout) : consumer.receive(timeout);
}
if (resourceExchange == null) {
LOG.debug("Consumer received no exchange");
} else {
LOG.debug("Consumer received: {}", resourceExchange);
}
} catch (Exception e) {
exchange.setException(new CamelExchangeException("Error during poll", exchange, e));
callback.done(true);
return true;
} finally {
// return the consumer back to the cache
consumerCache.releasePollingConsumer(endpoint, consumer);
// and stop prototype endpoints
if (prototype) {
ServiceHelper.stopAndShutdownService(endpoint);
}
}
// remember current redelivery stats
Object redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED);
Object redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER);
Object redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER);
// if we are bridging error handler and failed then remember the caused exception
Throwable cause = null;
if (resourceExchange != null && bridgeErrorHandler) {
cause = resourceExchange.getException();
}
// if we should store the received message body in a variable,
// then we need to preserve the original message body
Object originalBody = null;
Map<String, Object> originalHeaders = null;
if (variableReceive != null) {
try {
originalBody = exchange.getMessage().getBody();
// do a defensive copy of the headers
originalHeaders = headersMapFactory.newMap(exchange.getMessage().getHeaders());
} catch (Exception throwable) {
exchange.setException(throwable);
callback.done(true);
return true;
}
}
try {
if (!isAggregateOnException() && resourceExchange != null && resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving pattern)
// and preserve redelivery headers
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
// must catch any exception from aggregation
Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) {
// result should be stored in variable instead of message body
ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive,
aggregatedExchange.getMessage());
aggregatedExchange.getMessage().setBody(originalBody);
aggregatedExchange.getMessage().setHeaders(originalHeaders);
}
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
// handover any synchronization
if (resourceExchange != null) {
resourceExchange.getExchangeExtension().handoverCompletions(exchange);
}
}
}
// if we failed then restore caused exception
if (cause != null) {
// restore caused exception
exchange.setException(cause);
// remove the exhausted marker as we want to be able to perform redeliveries with the error handler
exchange.getExchangeExtension().setRedeliveryExhausted(false);
// preserve the redelivery stats
if (redelivered != null) {
exchange.getMessage().setHeader(Exchange.REDELIVERED, redelivered);
}
if (redeliveryCounter != null) {
exchange.getMessage().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
}
if (redeliveryMaxCounter != null) {
exchange.getMessage().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
}
}
// set property with the uri of the endpoint enriched so we can use that for tracing etc
exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
} catch (Exception e) {
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
callback.done(true);
return true;
}
callback.done(true);
return true;
}