in core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java [223:382]
public CamelKafkaConnectMain build(CamelContext camelContext) {
CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
camelMain.configure().setAutoConfigurationLogSummary(false);
//TODO: make it configurable
camelMain.configure().setDumpRoutes(true);
Properties camelProperties = new Properties();
camelProperties.putAll(props);
//error handler
camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
if (errorHandler != null) {
switch (errorHandler) {
case "no":
camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
break;
case "default":
camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
break;
default:
break;
}
}
//dataformats
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
camelProperties.put(KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
camelProperties.put(KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
}
//aggregator
if (!ObjectHelper.isEmpty(aggregationSize)) {
camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
}
if (!ObjectHelper.isEmpty(aggregationTimeout)) {
camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
}
//idempotency
if (idempotencyEnabled) {
switch (expressionType) {
case "body":
camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
break;
case "header":
camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
break;
default:
break;
}
// Instantiating the idempotent Repository here and inject it in registry to be referenced
IdempotentRepository idempotentRepo = null;
switch (idempotentRepositoryType) {
case "memory":
idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
break;
case "kafka":
idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers, idempotentRepositoryKafkaMaxCacheSize, idempotentRepositoryKafkaPollDuration);
break;
default:
break;
}
camelMain.getCamelContext().getRegistry().bind("ckcIdempotentRepository", idempotentRepo);
}
//remove headers
if (!ObjectHelper.isEmpty(headersExcludePattern)) {
camelProperties.put(KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
}
// log filtered properties and set initial camel properties
List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
camelMain.setInitialProperties(camelProperties);
camelMain.configure().addRoutesBuilder(new RouteBuilder() {
public void configure() {
//create marshal template
routeTemplate("ckcMarshal")
.templateParameter("marshal", "dummyDataformat")
.from("kamelet:source")
.marshal("{{marshal}}")
.to("kamelet:sink");
//create unmarshal template
routeTemplate("ckcUnMarshal")
.templateParameter("unmarshal", "dummyDataformat")
.from("kamelet:source")
.marshal("{{unmarshal}}")
.to("kamelet:sink");
//create aggregator template
routeTemplate("ckcAggregator")
//TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME to ckcAggregationStrategy?
.templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
.templateParameter("aggregationSize", "1")
.templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
.from("kamelet:source")
.aggregate(constant(true))
.aggregationStrategy("{{aggregationStrategy}}")
.completionSize("{{aggregationSize}}")
.completionTimeout("{{aggregationTimeout}}")
.to("kamelet:sink")
.end();
//create idempotent template
routeTemplate("ckcIdempotent")
.templateParameter("idempotentExpression", "dummyExpression")
.templateParameter("idempotentRepository", "ckcIdempotentRepository")
.from("kamelet:source")
.idempotentConsumer(simple("{{idempotentExpression}}")).idempotentRepository("{{idempotentRepository}}")
.to("kamelet:sink");
//create removeHeader template
routeTemplate("ckcRemoveHeader")
.templateParameter("headersExcludePattern", "(?!)")
.from("kamelet:source")
.removeHeaders("{{headersExcludePattern}}")
.to("kamelet:sink");
//creating source template
routeTemplate("ckcSource")
.templateParameter("fromUrl")
.templateParameter("errorHandler", "ckcErrorHandler")
.from("{{fromUrl}}")
.errorHandler("ckcErrorHandler")
.to("kamelet:sink");
//creating sink template
routeTemplate("ckcSink")
.templateParameter("toUrl")
.templateParameter("errorHandler", "ckcErrorHandler")
.from("kamelet:source")
.errorHandler("ckcErrorHandler")
.to("{{toUrl}}");
//creating the actual route
ProcessorDefinition<?> rd = from(from);
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
rd = rd.kamelet("ckcMarshal");
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
rd = rd.kamelet("ckcUnMarshal");
}
if (getContext().getRegistry().lookupByName("aggregate") != null) {
rd = rd.kamelet("ckcAggregator");
}
if (idempotencyEnabled) {
rd = rd.kamelet("ckcIdempotent");
}
rd = rd.kamelet("ckcRemoveHeader");
rd.toD(to);
}
});
return camelMain;
}