public CamelKafkaConnectMain build()

in core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java [223:382]


        public CamelKafkaConnectMain build(CamelContext camelContext) {
            CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
            camelMain.configure().setAutoConfigurationLogSummary(false);
            //TODO: make it configurable
            camelMain.configure().setDumpRoutes(true);

            Properties camelProperties = new Properties();
            camelProperties.putAll(props);

            //error handler
            camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
            if (errorHandler != null) {
                switch (errorHandler) {
                    case "no":
                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
                        break;
                    case "default":
                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
                        break;
                    default:
                        break;
                }
            }

            //dataformats
            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
                camelProperties.put(KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
            }
            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
                camelProperties.put(KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
            }

            //aggregator
            if (!ObjectHelper.isEmpty(aggregationSize)) {
                camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
            }
            if (!ObjectHelper.isEmpty(aggregationTimeout)) {
                camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
            }

            //idempotency
            if (idempotencyEnabled) {
                switch (expressionType) {
                    case "body":
                        camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
                        break;
                    case "header":
                        camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
                        break;
                    default:
                        break;
                }
                // Instantiating the idempotent Repository here and inject it in registry to be referenced
                IdempotentRepository idempotentRepo = null;
                switch (idempotentRepositoryType) {
                    case "memory":
                        idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
                        break;
                    case "kafka":
                        idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers, idempotentRepositoryKafkaMaxCacheSize, idempotentRepositoryKafkaPollDuration);
                        break;
                    default:
                        break;
                }
                camelMain.getCamelContext().getRegistry().bind("ckcIdempotentRepository", idempotentRepo);
            }

            //remove headers
            if (!ObjectHelper.isEmpty(headersExcludePattern)) {
                camelProperties.put(KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
            }

            // log filtered properties and set initial camel properties
            List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
            LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
            camelMain.setInitialProperties(camelProperties);

            camelMain.configure().addRoutesBuilder(new RouteBuilder() {
                public void configure() {

                    //create marshal template
                    routeTemplate("ckcMarshal")
                            .templateParameter("marshal", "dummyDataformat")
                            .from("kamelet:source")
                            .marshal("{{marshal}}")
                            .to("kamelet:sink");

                    //create unmarshal template
                    routeTemplate("ckcUnMarshal")
                            .templateParameter("unmarshal", "dummyDataformat")
                            .from("kamelet:source")
                            .marshal("{{unmarshal}}")
                            .to("kamelet:sink");

                    //create aggregator template
                    routeTemplate("ckcAggregator")
                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME to ckcAggregationStrategy?
                            .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
                            .templateParameter("aggregationSize", "1")
                            .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
                            .from("kamelet:source")
                            .aggregate(constant(true))
                                .aggregationStrategy("{{aggregationStrategy}}")
                                .completionSize("{{aggregationSize}}")
                                .completionTimeout("{{aggregationTimeout}}")
                                .to("kamelet:sink")
                            .end();

                    //create idempotent template
                    routeTemplate("ckcIdempotent")
                            .templateParameter("idempotentExpression", "dummyExpression")
                            .templateParameter("idempotentRepository", "ckcIdempotentRepository")
                            .from("kamelet:source")
                            .idempotentConsumer(simple("{{idempotentExpression}}")).idempotentRepository("{{idempotentRepository}}")
                            .to("kamelet:sink");

                    //create removeHeader template
                    routeTemplate("ckcRemoveHeader")
                            .templateParameter("headersExcludePattern", "(?!)")
                            .from("kamelet:source")
                            .removeHeaders("{{headersExcludePattern}}")
                            .to("kamelet:sink");

                    //creating source template
                    routeTemplate("ckcSource")
                            .templateParameter("fromUrl")
                            .templateParameter("errorHandler", "ckcErrorHandler")
                            .from("{{fromUrl}}")
                            .errorHandler("ckcErrorHandler")
                            .to("kamelet:sink");

                    //creating sink template
                    routeTemplate("ckcSink")
                            .templateParameter("toUrl")
                            .templateParameter("errorHandler", "ckcErrorHandler")
                            .from("kamelet:source")
                            .errorHandler("ckcErrorHandler")
                            .to("{{toUrl}}");

                    //creating the actual route
                    ProcessorDefinition<?> rd = from(from);
                    if (!ObjectHelper.isEmpty(marshallDataFormat)) {
                        rd = rd.kamelet("ckcMarshal");
                    }
                    if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
                        rd = rd.kamelet("ckcUnMarshal");
                    }
                    if (getContext().getRegistry().lookupByName("aggregate") != null) {
                        rd = rd.kamelet("ckcAggregator");
                    }
                    if (idempotencyEnabled) {
                        rd = rd.kamelet("ckcIdempotent");
                    }
                    rd = rd.kamelet("ckcRemoveHeader");
                    rd.toD(to);
                }
            });

            return camelMain;
        }