in extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java [55:133]
public void configure() throws Exception {
LOGGER.info("Configure Recurrent Route 'From Source'");
if (importConfigurationList == null) {
importConfigurationList = importConfigurationService.getAll();
}
ProcessorDefinition prDefErr = onException(BadProfileDataFormatException.class)
.log(LoggingLevel.ERROR, "Error processing record ${exchangeProperty.CamelSplitIndex}++ !")
.handled(true)
.process(new LineSplitFailureHandler())
.onException(Exception.class)
.log(LoggingLevel.ERROR, "Failed to process file.")
.handled(true);
if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
} else {
prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
}
//Loop on multiple import configuration
for (final ImportConfiguration importConfiguration : importConfigurationList) {
if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType()) &&
importConfiguration.getProperties() != null && importConfiguration.getProperties().size() > 0) {
//Prepare Split Processor
LineSplitProcessor lineSplitProcessor = new LineSplitProcessor();
lineSplitProcessor.setFieldsMapping((Map<String, Integer>) importConfiguration.getProperties().get("mapping"));
lineSplitProcessor.setOverwriteExistingProfiles(importConfiguration.isOverwriteExistingProfiles());
lineSplitProcessor.setPropertiesToOverwrite(importConfiguration.getPropertiesToOverwrite());
lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty());
lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator());
lineSplitProcessor.setHasHeader(importConfiguration.isHasHeader());
lineSplitProcessor.setHasDeleteColumn(importConfiguration.isHasDeleteColumn());
lineSplitProcessor.setMultiValueDelimiter(importConfiguration.getMultiValueDelimiter());
lineSplitProcessor.setMultiValueSeparator(importConfiguration.getMultiValueSeparator());
lineSplitProcessor.setProfilePropertyTypes(profileService.getTargetPropertyTypes("profiles"));
String endpoint = (String) importConfiguration.getProperties().get("source");
endpoint += "&moveFailed=.error";
if (StringUtils.isNotBlank(endpoint) && allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) {
ProcessorDefinition prDef = from(endpoint)
.routeId(importConfiguration.getItemId())// This allow identification of the route for manual start/stop
.autoStartup(importConfiguration.isActive())// Auto-start if the import configuration is set active
.shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks)
.onCompletion()
// this route is only invoked when the original route is complete as a kind
// of completion callback
.log(LoggingLevel.DEBUG, "ROUTE [" + importConfiguration.getItemId() + "] is now complete [" + new Date().toString() + "]")
// must use end to denote the end of the onCompletion route
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_RUNNING);
importConfigurationService.save(importConfiguration, false);
}
})
.split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator()))
.log(LoggingLevel.DEBUG, "Splitted into ${exchangeProperty.CamelSplitSize} records")
.setHeader(RouterConstants.HEADER_CONFIG_TYPE, constant(configType))
.process(lineSplitProcessor)
.log(LoggingLevel.DEBUG, "Split IDX ${exchangeProperty.CamelSplitIndex} record")
.marshal(jacksonDataFormat)
.convertBodyTo(String.class);
if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
} else {
prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
}
} else {
LOGGER.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), importConfiguration.getItemId());
}
}
}
}