protected void handleDoService()

in components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java [75:289]


    protected void handleDoService(final HttpServletRequest request, final HttpServletResponse response)
            throws Exception {

        // is there a consumer registered for the request.
        HttpConsumer consumer = getServletResolveConsumerStrategy().resolve(request, getConsumers());
        if (consumer == null) {
            // okay we cannot process this requires so return either 404 or 405.
            // to know if its 405 then we need to check if any other HTTP method would have a consumer for the "same" request
            boolean hasAnyMethod = METHODS.stream()
                    .anyMatch(m -> getServletResolveConsumerStrategy().isHttpMethodAllowed(request, m, getConsumers()));
            if (hasAnyMethod) {
                log.debug("No consumer to service request {} as method {} is not allowed", request, request.getMethod());
                sendError(response, HttpServletResponse.SC_METHOD_NOT_ALLOWED);
                return;
            } else {
                log.debug("No consumer to service request {} as resource is not found", request);
                sendError(response, HttpServletResponse.SC_NOT_FOUND);
                return;
            }
        }

        // figure out if continuation is enabled and what timeout to use
        boolean useContinuation = false;
        Long continuationTimeout = null;
        HttpCommonEndpoint endpoint = consumer.getEndpoint();
        if (endpoint instanceof JettyHttpEndpoint) {
            JettyHttpEndpoint jettyEndpoint = (JettyHttpEndpoint) endpoint;
            Boolean epUseContinuation = jettyEndpoint.getUseContinuation();
            Long epContinuationTimeout = jettyEndpoint.getContinuationTimeout();
            if (epUseContinuation != null) {
                useContinuation = epUseContinuation;
            } else {
                useContinuation = jettyEndpoint.getComponent().isUseContinuation();
            }
            if (epContinuationTimeout != null) {
                continuationTimeout = epContinuationTimeout;
            } else {
                continuationTimeout = jettyEndpoint.getComponent().getContinuationTimeout();
            }
        }
        if (useContinuation) {
            log.trace("Start request with continuation timeout of {}",
                    continuationTimeout != null ? continuationTimeout : "jetty default");
        } else {
            log.trace(
                    "Usage of continuation is disabled, either by component or endpoint configuration, fallback to normal servlet processing instead");
            super.doService(request, response);
            return;
        }

        // if its an OPTIONS request then return which method is allowed
        if ("OPTIONS".equals(request.getMethod()) && !consumer.isOptionsEnabled()) {
            String allowedMethods = METHODS.stream()
                    .filter(m -> getServletResolveConsumerStrategy().isHttpMethodAllowed(request, m, getConsumers()))
                    .collect(Collectors.joining(","));
            if (allowedMethods == null && consumer.getEndpoint().getHttpMethodRestrict() != null) {
                allowedMethods = consumer.getEndpoint().getHttpMethodRestrict();
            }
            if (allowedMethods == null) {
                // allow them all
                allowedMethods = "GET,HEAD,POST,PUT,DELETE,TRACE,OPTIONS,CONNECT,PATCH";
            }
            if (!allowedMethods.contains("OPTIONS")) {
                allowedMethods = allowedMethods + ",OPTIONS";
            }
            response.addHeader("Allow", allowedMethods);
            response.setStatus(HttpServletResponse.SC_OK);
            return;
        }

        if (consumer.getEndpoint().getHttpMethodRestrict() != null) {
            Iterator<?> it = ObjectHelper.createIterable(consumer.getEndpoint().getHttpMethodRestrict()).iterator();
            boolean match = false;
            while (it.hasNext()) {
                String method = it.next().toString();
                if (method.equalsIgnoreCase(request.getMethod())) {
                    match = true;
                    break;
                }
            }
            if (!match) {
                sendError(response, HttpServletResponse.SC_METHOD_NOT_ALLOWED);
                return;
            }
        }

        if ("TRACE".equals(request.getMethod()) && !consumer.isTraceEnabled()) {
            sendError(response, HttpServletResponse.SC_METHOD_NOT_ALLOWED);
            return;
        }

        // we do not support java serialized objects unless explicit enabled
        String contentType = request.getContentType();
        if (HttpConstants.CONTENT_TYPE_JAVA_SERIALIZED_OBJECT.equals(contentType)
                && !consumer.getEndpoint().getComponent().isAllowJavaSerializedObject()) {
            sendError(response, HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE);
            return;
        }

        final Exchange result = (Exchange) request.getAttribute(EXCHANGE_ATTRIBUTE_NAME);
        if (result == null) {
            // no asynchronous result so leverage continuation
            AsyncContext asyncContext = request.startAsync();
            if (isInitial(request) && continuationTimeout != null) {
                // set timeout on initial
                asyncContext.setTimeout(continuationTimeout.longValue());
            }
            asyncContext.addListener(new ExpiredListener(), request, response);

            // are we suspended and a request is dispatched initially?
            if (consumer.isSuspended() && isInitial(request)) {
                sendError(response, HttpServletResponse.SC_SERVICE_UNAVAILABLE);
                return;
            }

            // a new request so create an exchange
            // must be prototype scoped (not pooled) so we create the exchange via endpoint
            final Exchange exchange = consumer.createExchange(false);
            exchange.setPattern(ExchangePattern.InOut);

            if (consumer.getEndpoint().isBridgeEndpoint()) {
                exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
                exchange.setProperty(Exchange.SKIP_WWW_FORM_URLENCODED, Boolean.TRUE);
            }
            if (consumer.getEndpoint().isDisableStreamCache()) {
                exchange.setProperty(Exchange.DISABLE_HTTP_STREAM_CACHE, Boolean.TRUE);
            }

            String charset = request.getCharacterEncoding();
            if (charset != null) {
                exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, charset);
            }
            // reuse existing http message if pooled
            Message msg = exchange.getIn();
            if (msg instanceof HttpMessage) {
                HttpMessage hm = (HttpMessage) msg;
                hm.init(exchange, endpoint, request, response);
            } else {
                exchange.setIn(new HttpMessage(exchange, endpoint, request, response));
            }
            // set context path as header
            String contextPath = consumer.getEndpoint().getPath();
            exchange.getIn().setHeader(JettyHttpConstants.SERVLET_CONTEXT_PATH, contextPath);

            updateHttpPath(exchange, contextPath);

            if (log.isTraceEnabled()) {
                log.trace("Suspending continuation of exchangeId: {}", exchange.getExchangeId());
            }
            request.setAttribute(EXCHANGE_ATTRIBUTE_ID, exchange.getExchangeId());

            // we want to handle the UoW
            UnitOfWork uow = exchange.getUnitOfWork();
            if (uow == null) {
                consumer.createUoW(exchange);
            } else if (uow.onPrepare(exchange)) {
                // need to re-attach uow
                exchange.getExchangeExtension().setUnitOfWork(uow);
            }

            ClassLoader oldTccl = overrideTccl(exchange);

            if (log.isTraceEnabled()) {
                log.trace("Processing request for exchangeId: {}", exchange.getExchangeId());
            }
            // use the asynchronous API to process the exchange
            consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
                public void done(boolean doneSync) {
                    // check if the exchange id is already expired
                    boolean expired = expiredExchanges.remove(exchange.getExchangeId()) != null;
                    if (!expired) {
                        if (log.isTraceEnabled()) {
                            log.trace("Resuming continuation of exchangeId: {}", exchange.getExchangeId());
                        }
                        // resume processing after both, sync and async callbacks
                        request.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
                        asyncContext.dispatch();
                    } else {
                        log.warn("Cannot resume expired continuation of exchangeId: {}", exchange.getExchangeId());
                        consumer.releaseExchange(exchange, false);
                    }
                }
            });

            if (oldTccl != null) {
                restoreTccl(exchange, oldTccl);
            }

            // return to let Jetty continuation to work as it will resubmit and invoke the service
            // method again when its resumed
            return;
        }

        try {
            // now lets output to the response
            if (log.isTraceEnabled()) {
                log.trace("Resumed continuation and writing response for exchangeId: {}", result.getExchangeId());
            }
            Integer bs = consumer.getEndpoint().getResponseBufferSize();
            if (bs != null) {
                log.trace("Using response buffer size: {}", bs);
                response.setBufferSize(bs);
            }
            consumer.getBinding().writeResponse(result, response);
        } catch (IOException e) {
            log.error("Error processing request", e);
            throw e;
        } catch (Exception e) {
            log.error("Error processing request", e);
            throw new CamelException(e);
        } finally {
            consumer.doneUoW(result);
            consumer.releaseExchange(result, false);
        }
    }