public void configure()

in extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java [55:133]


    public void configure() throws Exception {

        LOGGER.info("Configure Recurrent Route 'From Source'");

        if (importConfigurationList == null) {
            importConfigurationList = importConfigurationService.getAll();
        }

        ProcessorDefinition prDefErr = onException(BadProfileDataFormatException.class)
                .log(LoggingLevel.ERROR, "Error processing record ${exchangeProperty.CamelSplitIndex}++ !")
                .handled(true)
                .process(new LineSplitFailureHandler())
                .onException(Exception.class)
                .log(LoggingLevel.ERROR, "Failed to process file.")
                .handled(true);

        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
            prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
        } else {
            prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
        }

        //Loop on multiple import configuration
        for (final ImportConfiguration importConfiguration : importConfigurationList) {
            if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType()) &&
                    importConfiguration.getProperties() != null && importConfiguration.getProperties().size() > 0) {
                //Prepare Split Processor
                LineSplitProcessor lineSplitProcessor = new LineSplitProcessor();
                lineSplitProcessor.setFieldsMapping((Map<String, Integer>) importConfiguration.getProperties().get("mapping"));
                lineSplitProcessor.setOverwriteExistingProfiles(importConfiguration.isOverwriteExistingProfiles());
                lineSplitProcessor.setPropertiesToOverwrite(importConfiguration.getPropertiesToOverwrite());
                lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty());
                lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator());
                lineSplitProcessor.setHasHeader(importConfiguration.isHasHeader());
                lineSplitProcessor.setHasDeleteColumn(importConfiguration.isHasDeleteColumn());
                lineSplitProcessor.setMultiValueDelimiter(importConfiguration.getMultiValueDelimiter());
                lineSplitProcessor.setMultiValueSeparator(importConfiguration.getMultiValueSeparator());
                lineSplitProcessor.setProfilePropertyTypes(profileService.getTargetPropertyTypes("profiles"));

                String endpoint = (String) importConfiguration.getProperties().get("source");
                endpoint += "&moveFailed=.error";

                if (StringUtils.isNotBlank(endpoint) && allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) {
                    ProcessorDefinition prDef = from(endpoint)
                            .routeId(importConfiguration.getItemId())// This allow identification of the route for manual start/stop
                            .autoStartup(importConfiguration.isActive())// Auto-start if the import configuration is set active
                            .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks)
                            .onCompletion()
                            // this route is only invoked when the original route is complete as a kind
                            // of completion callback
                            .log(LoggingLevel.DEBUG, "ROUTE [" + importConfiguration.getItemId() + "] is now complete [" + new Date().toString() + "]")
                            // must use end to denote the end of the onCompletion route
                            .end()
                            .process(new Processor() {
                                @Override
                                public void process(Exchange exchange) throws Exception {
                                    importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_RUNNING);
                                    importConfigurationService.save(importConfiguration, false);
                                }
                            })
                            .split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator()))
                            .log(LoggingLevel.DEBUG, "Splitted into ${exchangeProperty.CamelSplitSize} records")
                            .setHeader(RouterConstants.HEADER_CONFIG_TYPE, constant(configType))
                            .process(lineSplitProcessor)
                            .log(LoggingLevel.DEBUG, "Split IDX ${exchangeProperty.CamelSplitIndex} record")
                            .marshal(jacksonDataFormat)
                            .convertBodyTo(String.class);

                    if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
                        prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
                    } else {
                        prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
                    }
                } else {
                    LOGGER.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), importConfiguration.getItemId());
                }
            }
        }
    }