in components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java [75:289]
protected void handleDoService(final HttpServletRequest request, final HttpServletResponse response)
throws Exception {
// is there a consumer registered for the request.
HttpConsumer consumer = getServletResolveConsumerStrategy().resolve(request, getConsumers());
if (consumer == null) {
// okay we cannot process this requires so return either 404 or 405.
// to know if its 405 then we need to check if any other HTTP method would have a consumer for the "same" request
boolean hasAnyMethod = METHODS.stream()
.anyMatch(m -> getServletResolveConsumerStrategy().isHttpMethodAllowed(request, m, getConsumers()));
if (hasAnyMethod) {
log.debug("No consumer to service request {} as method {} is not allowed", request, request.getMethod());
sendError(response, HttpServletResponse.SC_METHOD_NOT_ALLOWED);
return;
} else {
log.debug("No consumer to service request {} as resource is not found", request);
sendError(response, HttpServletResponse.SC_NOT_FOUND);
return;
}
}
// figure out if continuation is enabled and what timeout to use
boolean useContinuation = false;
Long continuationTimeout = null;
HttpCommonEndpoint endpoint = consumer.getEndpoint();
if (endpoint instanceof JettyHttpEndpoint) {
JettyHttpEndpoint jettyEndpoint = (JettyHttpEndpoint) endpoint;
Boolean epUseContinuation = jettyEndpoint.getUseContinuation();
Long epContinuationTimeout = jettyEndpoint.getContinuationTimeout();
if (epUseContinuation != null) {
useContinuation = epUseContinuation;
} else {
useContinuation = jettyEndpoint.getComponent().isUseContinuation();
}
if (epContinuationTimeout != null) {
continuationTimeout = epContinuationTimeout;
} else {
continuationTimeout = jettyEndpoint.getComponent().getContinuationTimeout();
}
}
if (useContinuation) {
log.trace("Start request with continuation timeout of {}",
continuationTimeout != null ? continuationTimeout : "jetty default");
} else {
log.trace(
"Usage of continuation is disabled, either by component or endpoint configuration, fallback to normal servlet processing instead");
super.doService(request, response);
return;
}
// if its an OPTIONS request then return which method is allowed
if ("OPTIONS".equals(request.getMethod()) && !consumer.isOptionsEnabled()) {
String allowedMethods = METHODS.stream()
.filter(m -> getServletResolveConsumerStrategy().isHttpMethodAllowed(request, m, getConsumers()))
.collect(Collectors.joining(","));
if (allowedMethods == null && consumer.getEndpoint().getHttpMethodRestrict() != null) {
allowedMethods = consumer.getEndpoint().getHttpMethodRestrict();
}
if (allowedMethods == null) {
// allow them all
allowedMethods = "GET,HEAD,POST,PUT,DELETE,TRACE,OPTIONS,CONNECT,PATCH";
}
if (!allowedMethods.contains("OPTIONS")) {
allowedMethods = allowedMethods + ",OPTIONS";
}
response.addHeader("Allow", allowedMethods);
response.setStatus(HttpServletResponse.SC_OK);
return;
}
if (consumer.getEndpoint().getHttpMethodRestrict() != null) {
Iterator<?> it = ObjectHelper.createIterable(consumer.getEndpoint().getHttpMethodRestrict()).iterator();
boolean match = false;
while (it.hasNext()) {
String method = it.next().toString();
if (method.equalsIgnoreCase(request.getMethod())) {
match = true;
break;
}
}
if (!match) {
sendError(response, HttpServletResponse.SC_METHOD_NOT_ALLOWED);
return;
}
}
if ("TRACE".equals(request.getMethod()) && !consumer.isTraceEnabled()) {
sendError(response, HttpServletResponse.SC_METHOD_NOT_ALLOWED);
return;
}
// we do not support java serialized objects unless explicit enabled
String contentType = request.getContentType();
if (HttpConstants.CONTENT_TYPE_JAVA_SERIALIZED_OBJECT.equals(contentType)
&& !consumer.getEndpoint().getComponent().isAllowJavaSerializedObject()) {
sendError(response, HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE);
return;
}
final Exchange result = (Exchange) request.getAttribute(EXCHANGE_ATTRIBUTE_NAME);
if (result == null) {
// no asynchronous result so leverage continuation
AsyncContext asyncContext = request.startAsync();
if (isInitial(request) && continuationTimeout != null) {
// set timeout on initial
asyncContext.setTimeout(continuationTimeout.longValue());
}
asyncContext.addListener(new ExpiredListener(), request, response);
// are we suspended and a request is dispatched initially?
if (consumer.isSuspended() && isInitial(request)) {
sendError(response, HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
// a new request so create an exchange
// must be prototype scoped (not pooled) so we create the exchange via endpoint
final Exchange exchange = consumer.createExchange(false);
exchange.setPattern(ExchangePattern.InOut);
if (consumer.getEndpoint().isBridgeEndpoint()) {
exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
exchange.setProperty(Exchange.SKIP_WWW_FORM_URLENCODED, Boolean.TRUE);
}
if (consumer.getEndpoint().isDisableStreamCache()) {
exchange.setProperty(Exchange.DISABLE_HTTP_STREAM_CACHE, Boolean.TRUE);
}
String charset = request.getCharacterEncoding();
if (charset != null) {
exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, charset);
}
// reuse existing http message if pooled
Message msg = exchange.getIn();
if (msg instanceof HttpMessage) {
HttpMessage hm = (HttpMessage) msg;
hm.init(exchange, endpoint, request, response);
} else {
exchange.setIn(new HttpMessage(exchange, endpoint, request, response));
}
// set context path as header
String contextPath = consumer.getEndpoint().getPath();
exchange.getIn().setHeader(JettyHttpConstants.SERVLET_CONTEXT_PATH, contextPath);
updateHttpPath(exchange, contextPath);
if (log.isTraceEnabled()) {
log.trace("Suspending continuation of exchangeId: {}", exchange.getExchangeId());
}
request.setAttribute(EXCHANGE_ATTRIBUTE_ID, exchange.getExchangeId());
// we want to handle the UoW
UnitOfWork uow = exchange.getUnitOfWork();
if (uow == null) {
consumer.createUoW(exchange);
} else if (uow.onPrepare(exchange)) {
// need to re-attach uow
exchange.getExchangeExtension().setUnitOfWork(uow);
}
ClassLoader oldTccl = overrideTccl(exchange);
if (log.isTraceEnabled()) {
log.trace("Processing request for exchangeId: {}", exchange.getExchangeId());
}
// use the asynchronous API to process the exchange
consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// check if the exchange id is already expired
boolean expired = expiredExchanges.remove(exchange.getExchangeId()) != null;
if (!expired) {
if (log.isTraceEnabled()) {
log.trace("Resuming continuation of exchangeId: {}", exchange.getExchangeId());
}
// resume processing after both, sync and async callbacks
request.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
asyncContext.dispatch();
} else {
log.warn("Cannot resume expired continuation of exchangeId: {}", exchange.getExchangeId());
consumer.releaseExchange(exchange, false);
}
}
});
if (oldTccl != null) {
restoreTccl(exchange, oldTccl);
}
// return to let Jetty continuation to work as it will resubmit and invoke the service
// method again when its resumed
return;
}
try {
// now lets output to the response
if (log.isTraceEnabled()) {
log.trace("Resumed continuation and writing response for exchangeId: {}", result.getExchangeId());
}
Integer bs = consumer.getEndpoint().getResponseBufferSize();
if (bs != null) {
log.trace("Using response buffer size: {}", bs);
response.setBufferSize(bs);
}
consumer.getBinding().writeResponse(result, response);
} catch (IOException e) {
log.error("Error processing request", e);
throw e;
} catch (Exception e) {
log.error("Error processing request", e);
throw new CamelException(e);
} finally {
consumer.doneUoW(result);
consumer.releaseExchange(result, false);
}
}