in core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java [98:407]
protected Route doCreateRoute() throws Exception {
// resolve endpoint
Endpoint endpoint = definition.getInput().getEndpoint();
if (endpoint == null) {
EndpointConsumerResolver def = definition.getInput().getEndpointConsumerBuilder();
if (def != null) {
endpoint = def.resolve(camelContext);
} else {
endpoint = resolveEndpoint(definition.getInput().getEndpointUri());
}
}
// create route
String id = definition.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class));
String desc = definition.getDescriptionText();
Route route = PluginHelper.getRouteFactory(camelContext).createRoute(camelContext, definition, id,
desc, endpoint, definition.getResource());
// configure error handler
route.setErrorHandlerFactory(definition.getErrorHandlerFactory());
// configure variable
String variable = parseString(definition.getInput().getVariableReceive());
if (variable != null) {
// when using variable we need to turn on original message
route.setAllowUseOriginalMessage(true);
}
// configure tracing
if (definition.getTrace() != null) {
Boolean isTrace = parseBoolean(definition.getTrace());
if (isTrace != null) {
route.setTracing(isTrace);
if (isTrace) {
LOG.debug("Tracing is enabled on route: {}", definition.getId());
// tracing is added in the DefaultChannel so we can enable
// it on the fly
}
}
}
// configure message history
if (definition.getMessageHistory() != null) {
Boolean isMessageHistory = parseBoolean(definition.getMessageHistory());
if (isMessageHistory != null) {
route.setMessageHistory(isMessageHistory);
if (isMessageHistory) {
LOG.debug("Message history is enabled on route: {}", definition.getId());
}
}
}
// configure Log EIP mask
if (definition.getLogMask() != null) {
Boolean isLogMask = parseBoolean(definition.getLogMask());
if (isLogMask != null) {
route.setLogMask(isLogMask);
if (isLogMask) {
LOG.debug("Security mask for Logging is enabled on route: {}", definition.getId());
}
}
}
// configure stream caching
if (definition.getStreamCache() != null) {
Boolean isStreamCache = parseBoolean(definition.getStreamCache());
if (isStreamCache != null) {
route.setStreamCaching(isStreamCache);
if (isStreamCache) {
LOG.debug("StreamCaching is enabled on route: {}", definition.getId());
}
}
}
// configure delayer
if (definition.getDelayer() != null) {
Long delayer = parseDuration(definition.getDelayer());
if (delayer != null) {
route.setDelayer(delayer);
if (delayer > 0) {
LOG.debug("Delayer is enabled with: {} ms. on route: {}", delayer, definition.getId());
} else {
LOG.debug("Delayer is disabled on route: {}", definition.getId());
}
}
}
// configure auto startup
Boolean isAutoStartup = parseBoolean(definition.getAutoStartup());
// configure startup order
Integer startupOrder = definition.getStartupOrder();
// configure shutdown
if (definition.getShutdownRoute() != null) {
LOG.debug("Using ShutdownRoute {} on route: {}", definition.getShutdownRoute(), definition.getId());
route.setShutdownRoute(parse(ShutdownRoute.class, definition.getShutdownRoute()));
}
if (definition.getShutdownRunningTask() != null) {
LOG.debug("Using ShutdownRunningTask {} on route: {}", definition.getShutdownRunningTask(), definition.getId());
route.setShutdownRunningTask(parse(ShutdownRunningTask.class, definition.getShutdownRunningTask()));
}
// should inherit the intercept strategies we have defined
route.getInterceptStrategies().addAll(definition.getInterceptStrategies());
// notify route context created
for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
strategy.onRouteContextCreate(route);
}
// validate route has output processors
if (!hasOutputs(definition.getOutputs(), true)) {
String at = definition.getInput().toString();
Exception cause = new IllegalArgumentException(
"Route " + definition.getId() + " has no output processors."
+ " You need to add outputs to the route such as to(\"log:foo\").");
throw new FailedToCreateRouteException(definition.getId(), definition.toString(), at, cause);
}
List<ProcessorDefinition<?>> list = new ArrayList<>(definition.getOutputs());
for (ProcessorDefinition<?> output : list) {
try {
ProcessorReifier<?> reifier = ProcessorReifier.reifier(route, output);
// ensure node has id assigned
String outputId
= output.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class));
String eip = reifier.getClass().getSimpleName().replace("Reifier", "");
StartupStep step = camelContext.getCamelContextExtension().getStartupStepRecorder()
.beginStep(ProcessorReifier.class, outputId, "Create " + eip + " Processor");
reifier.addRoutes();
camelContext.getCamelContextExtension().getStartupStepRecorder().endStep(step);
} catch (Exception e) {
throw new FailedToCreateRouteException(definition.getId(), definition.toString(), output.toString(), e);
}
}
// now lets turn all the event driven consumer processors into a single route
List<Processor> eventDrivenProcessors = route.getEventDrivenProcessors();
if (eventDrivenProcessors.isEmpty()) {
return null;
}
// Set route properties
Map<String, Object> routeProperties = computeRouteProperties();
// always use a pipeline even if there are only 1 processor as the pipeline
// handles preparing the response from the exchange in regard to IN vs OUT messages etc
RoutePipeline target = new RoutePipeline(camelContext, eventDrivenProcessors);
target.setRouteId(id);
// and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
InternalProcessor internal = PluginHelper.getInternalProcessorFactory(camelContext)
.addUnitOfWorkProcessorAdvice(camelContext, target, route);
// configure route policy
if (definition.getRoutePolicies() != null && !definition.getRoutePolicies().isEmpty()) {
for (RoutePolicy policy : definition.getRoutePolicies()) {
LOG.debug("RoutePolicy is enabled: {} on route: {}", policy, definition.getId());
route.getRoutePolicyList().add(policy);
}
}
if (definition.getRoutePolicyRef() != null) {
StringTokenizer policyTokens = new StringTokenizer(definition.getRoutePolicyRef(), ",");
while (policyTokens.hasMoreTokens()) {
String ref = policyTokens.nextToken().trim();
RoutePolicy policy = mandatoryLookup(ref, RoutePolicy.class);
LOG.debug("RoutePolicy is enabled: {} on route: {}", policy, definition.getId());
route.getRoutePolicyList().add(policy);
}
}
if (camelContext.getRoutePolicyFactories() != null) {
for (RoutePolicyFactory factory : camelContext.getRoutePolicyFactories()) {
RoutePolicy policy = factory.createRoutePolicy(camelContext, definition.getId(), definition);
if (policy != null) {
LOG.debug("RoutePolicy is enabled: {} on route: {}", policy, definition.getId());
route.getRoutePolicyList().add(policy);
}
}
}
// and then optionally add route policy processor if a custom policy is set
List<RoutePolicy> routePolicyList = route.getRoutePolicyList();
if (routePolicyList != null && !routePolicyList.isEmpty()) {
for (RoutePolicy policy : routePolicyList) {
// add policy as service if we have not already done that (eg possible if two routes have the same service)
// this ensures Camel can control the lifecycle of the policy
if (!camelContext.hasService(policy)) {
try {
// inject route
if (policy instanceof RouteAware ra) {
ra.setRoute(route);
}
camelContext.addService(policy);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
}
}
}
internal.addRoutePolicyAdvice(routePolicyList);
}
// wrap in route inflight processor to track number of inflight exchanges for the route
internal.addRouteInflightRepositoryAdvice(camelContext.getInflightRepository(), route.getRouteId());
// wrap in JMX instrumentation processor that is used for performance stats
ManagementInterceptStrategy managementInterceptStrategy = route.getManagementInterceptStrategy();
if (managementInterceptStrategy != null) {
internal.addManagementInterceptStrategy(managementInterceptStrategy.createProcessor("route"));
}
// wrap in route lifecycle
internal.addRouteLifecycleAdvice();
// add advices
if (definition.getRestBindingDefinition() != null) {
try {
// when disabling bean or processor we should also disable rest-dsl binding advice
boolean disabled
= "true".equalsIgnoreCase(route.getCamelContext().getGlobalOption(DISABLE_BEAN_OR_PROCESS_PROCESSORS));
if (!disabled) {
internal.addAdvice(
new RestBindingReifier(route, definition.getRestBindingDefinition()).createRestBindingAdvice());
}
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
}
}
// wrap in contract
if (definition.getInputType() != null || definition.getOutputType() != null) {
Contract contract = new Contract();
if (definition.getInputType() != null) {
contract.setInputType(parseString(definition.getInputType().getUrn()));
contract.setValidateInput(parseBoolean(definition.getInputType().getValidate(), false));
}
if (definition.getOutputType() != null) {
contract.setOutputType(parseString(definition.getOutputType().getUrn()));
contract.setValidateOutput(parseBoolean(definition.getOutputType().getValidate(), false));
}
internal.addAdvice(new ContractAdvice(contract));
// make sure to enable data type as its in use when using
// input/output types on routes
camelContext.setUseDataType(true);
}
// wrap with variable
if (variable != null) {
internal.addAdvice(new VariableAdvice(variable));
}
// and create the route that wraps all of this
route.setProcessor(internal);
route.getProperties().putAll(routeProperties);
route.setStartupOrder(startupOrder);
if (isAutoStartup != null) {
LOG.debug("Using AutoStartup {} on route: {}", isAutoStartup, definition.getId());
route.setAutoStartup(isAutoStartup);
}
// after the route is created then set the route on the policy processor(s) so we get hold of it
internal.setRouteOnAdvices(route);
// invoke init on route policy
if (routePolicyList != null && !routePolicyList.isEmpty()) {
for (RoutePolicy policy : routePolicyList) {
policy.onInit(route);
}
}
// inject the route error handler for processors that are error handler aware
// this needs to be done here at the end because the route may be transactional and have a transaction error handler
// automatic be configured which some EIPs like Multicast/RecipientList needs to be using for special fine-grained error handling
ErrorHandlerFactory builder = route.getErrorHandlerFactory();
Processor errorHandler = ((ModelCamelContext) camelContext).getModelReifierFactory().createErrorHandler(route,
builder, null);
prepareErrorHandlerAware(route, errorHandler);
// only during startup phase
if (camelContext.getStatus().ordinal() < ServiceStatus.Started.ordinal()) {
// okay route has been created from the model, then the model is no longer needed, and we can de-reference
camelContext.getCamelContextExtension().addBootstrap(route::clearRouteModel);
}
if (definition.getRouteTemplateContext() != null) {
// make route stop beans from the local repository (route templates / kamelets)
Service wrapper = new ServiceSupport() {
@Override
protected void doStop() throws Exception {
close();
}
@Override
public void close() throws IOException {
BeanRepository repo = definition.getRouteTemplateContext().getLocalBeanRepository();
if (repo instanceof Closeable obj) {
IOHelper.close(obj);
}
super.close();
}
};
route.addService(wrapper, true);
}
return route;
}