public boolean process()

in core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java [247:445]


    public boolean process(Exchange exchange, AsyncCallback callback) {
        try {
            preCheckPoll(exchange);
        } catch (Exception e) {
            exchange.setException(new CamelExchangeException("Error during pre poll check", exchange, e));
            callback.done(true);
            return true;
        }

        // which consumer to use
        PollingConsumer consumer;
        Endpoint endpoint;

        // use dynamic endpoint so calculate the endpoint to use
        Object recipient = null;
        String staticUri = null;
        boolean prototype = cacheSize < 0;
        try {
            recipient = expression.evaluate(exchange, Object.class);
            if (dynamicAware != null) {
                // if its the same scheme as the pre-resolved dynamic aware then we can optimise to use it
                String originalUri = uri;
                String uri = resolveUri(exchange, recipient);
                String scheme = resolveScheme(exchange, uri);
                if (dynamicAware.getScheme().equals(scheme)) {
                    PollDynamicAware.DynamicAwareEntry entry = dynamicAware.prepare(exchange, uri, originalUri);
                    if (entry != null) {
                        staticUri = dynamicAware.resolveStaticUri(exchange, entry);
                        if (staticUri != null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Optimising poll via PollDynamicAware component: {} to use static uri: {}", scheme,
                                        URISupport.sanitizeUri(staticUri));
                            }
                        }
                    }
                }
            }
            Object targetRecipient = staticUri != null ? staticUri : recipient;
            targetRecipient = prepareRecipient(exchange, targetRecipient);
            if (targetRecipient == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Poll dynamic evaluated as null so cannot poll from any endpoint");
                }
                // no endpoint to send to, so ignore
                callback.done(true);
                return true;
            }
            Endpoint existing = getExistingEndpoint(exchange, targetRecipient);
            if (existing == null) {
                endpoint = resolveEndpoint(exchange, targetRecipient, prototype);
            } else {
                endpoint = existing;
                // we have an existing endpoint then its not a prototype scope
                prototype = false;
            }

            // acquire the consumer from the cache
            consumer = consumerCache.acquirePollingConsumer(endpoint);
        } catch (Exception e) {
            if (isIgnoreInvalidEndpoint()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e);
                }
            } else {
                exchange.setException(e);
            }
            callback.done(true);
            return true;
        }

        // grab the real delegate consumer that performs the actual polling
        final boolean bridgeErrorHandler = isBridgeErrorHandler(consumer);

        DynamicPollingConsumer dynamicConsumer = null;
        if (consumer instanceof DynamicPollingConsumer dyn) {
            dynamicConsumer = dyn;
        }

        Exchange resourceExchange;
        try {
            if (timeout < 0) {
                LOG.debug("Consumer receive: {}", consumer);
                resourceExchange = dynamicConsumer != null ? dynamicConsumer.receive(exchange) : consumer.receive();
            } else if (timeout == 0) {
                LOG.debug("Consumer receiveNoWait: {}", consumer);
                resourceExchange = dynamicConsumer != null ? dynamicConsumer.receiveNoWait(exchange) : consumer.receiveNoWait();
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
                }
                resourceExchange
                        = dynamicConsumer != null ? dynamicConsumer.receive(exchange, timeout) : consumer.receive(timeout);
            }

            if (resourceExchange == null) {
                LOG.debug("Consumer received no exchange");
            } else {
                LOG.debug("Consumer received: {}", resourceExchange);
            }
        } catch (Exception e) {
            exchange.setException(new CamelExchangeException("Error during poll", exchange, e));
            callback.done(true);
            return true;
        } finally {
            // return the consumer back to the cache
            consumerCache.releasePollingConsumer(endpoint, consumer);
            // and stop prototype endpoints
            if (prototype) {
                ServiceHelper.stopAndShutdownService(endpoint);
            }
        }

        // remember current redelivery stats
        Object redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED);
        Object redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER);
        Object redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER);

        // if we are bridging error handler and failed then remember the caused exception
        Throwable cause = null;
        if (resourceExchange != null && bridgeErrorHandler) {
            cause = resourceExchange.getException();
        }

        // if we should store the received message body in a variable,
        // then we need to preserve the original message body
        Object originalBody = null;
        Map<String, Object> originalHeaders = null;
        if (variableReceive != null) {
            try {
                originalBody = exchange.getMessage().getBody();
                // do a defensive copy of the headers
                originalHeaders = headersMapFactory.newMap(exchange.getMessage().getHeaders());
            } catch (Exception throwable) {
                exchange.setException(throwable);
                callback.done(true);
                return true;
            }
        }

        try {
            if (!isAggregateOnException() && resourceExchange != null && resourceExchange.isFailed()) {
                // copy resource exchange onto original exchange (preserving pattern)
                // and preserve redelivery headers
                copyResultsPreservePattern(exchange, resourceExchange);
            } else {
                prepareResult(exchange);

                // prepare the exchanges for aggregation
                ExchangeHelper.prepareAggregation(exchange, resourceExchange);
                // must catch any exception from aggregation
                Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
                if (aggregatedExchange != null) {
                    if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) {
                        // result should be stored in variable instead of message body
                        ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive,
                                aggregatedExchange.getMessage());
                        aggregatedExchange.getMessage().setBody(originalBody);
                        aggregatedExchange.getMessage().setHeaders(originalHeaders);
                    }
                    // copy aggregation result onto original exchange (preserving pattern)
                    copyResultsPreservePattern(exchange, aggregatedExchange);
                    // handover any synchronization
                    if (resourceExchange != null) {
                        resourceExchange.getExchangeExtension().handoverCompletions(exchange);
                    }
                }
            }

            // if we failed then restore caused exception
            if (cause != null) {
                // restore caused exception
                exchange.setException(cause);
                // remove the exhausted marker as we want to be able to perform redeliveries with the error handler
                exchange.getExchangeExtension().setRedeliveryExhausted(false);

                // preserve the redelivery stats
                if (redelivered != null) {
                    exchange.getMessage().setHeader(Exchange.REDELIVERED, redelivered);
                }
                if (redeliveryCounter != null) {
                    exchange.getMessage().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
                }
                if (redeliveryMaxCounter != null) {
                    exchange.getMessage().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
                }
            }

            // set property with the uri of the endpoint enriched so we can use that for tracing etc
            exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());

        } catch (Exception e) {
            exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
            callback.done(true);
            return true;
        }

        callback.done(true);
        return true;
    }