public void process()

in extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java [58:174]


    public void process(Exchange exchange) throws Exception {

        //In case of one shot import we check the header and overwrite import config
        ImportConfiguration importConfigOneShot = (ImportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT);
        String configType = (String) exchange.getIn().getHeader(RouterConstants.HEADER_CONFIG_TYPE);
        if (importConfigOneShot != null) {
            fieldsMapping = (Map<String, Integer>) importConfigOneShot.getProperties().get("mapping");
            propertiesToOverwrite = importConfigOneShot.getPropertiesToOverwrite();
            mergingProperty = importConfigOneShot.getMergingProperty();
            overwriteExistingProfiles = importConfigOneShot.isOverwriteExistingProfiles();
            columnSeparator = importConfigOneShot.getColumnSeparator();
            hasHeader = importConfigOneShot.isHasHeader();
            hasDeleteColumn = importConfigOneShot.isHasDeleteColumn();
            multiValueSeparator = importConfigOneShot.getMultiValueSeparator();
            multiValueDelimiter = importConfigOneShot.getMultiValueDelimiter();
        }

        if ((Integer) exchange.getProperty("CamelSplitIndex") == 0 && hasHeader) {
            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
            return;
        }

        RFC4180Parser rfc4180Parser = new RFC4180ParserBuilder()
                .withSeparator(columnSeparator.charAt(0))
                .build();

        LOGGER.debug("$$$$ : LineSplitProcessor : BODY : {}", exchange.getIn().getBody());

        String[] profileData = rfc4180Parser.parseLine(((String) exchange.getIn().getBody()));

        ProfileToImport profileToImport = new ProfileToImport();
        profileToImport.setItemId(UUID.randomUUID().toString());
        profileToImport.setItemType("profile");
        profileToImport.setScope(RouterConstants.SYSTEM_SCOPE);

        if (profileData.length > 0 && StringUtils.isNotBlank(profileData[0])) {
            if ((hasDeleteColumn && (fieldsMapping.size() > (profileData.length - 1)))
                    || (!hasDeleteColumn && (fieldsMapping.size() > (profileData.length)))
                    ) {
                throw new BadProfileDataFormatException("The mapping does not match the number of column : line [" + ((Integer) exchange.getProperty("CamelSplitIndex") + 1) + "]", new Throwable("MAPPING_COLUMN_MATCH"));
            }
            LOGGER.debug("$$$$ : LineSplitProcessor : MAPPING : {}", fieldsMapping.keySet());
            Map<String, Object> properties = new HashMap<>();
            for (String fieldMappingKey : fieldsMapping.keySet()) {
                PropertyType propertyType = RouterUtils.getPropertyTypeById(profilePropertyTypes, fieldMappingKey);

                if (fieldMappingKey != null && fieldsMapping.get(fieldMappingKey) != null && profileData != null && profileData[fieldsMapping.get(fieldMappingKey)] != null) {
                    LOGGER.debug("$$$$ : LineSplitProcessor : PropType value : {}", profileData[fieldsMapping.get(fieldMappingKey)].trim());
                } else {
                    LOGGER.debug("$$$$ : LineSplitProcessor : no profileData found for fieldMappingKey={}", fieldMappingKey);
                }

                if (profileData.length > fieldsMapping.get(fieldMappingKey)) {
                    try {
                        if (propertyType == null) {
                            LOGGER.error("No valid property type found for propertyTypeId={}", fieldMappingKey);
                        } else {
                            if (propertyType.getValueTypeId() == null) {
                                LOGGER.error("No value type id found for property type {}", propertyType.getItemId());
                            }
                        }
                        if (propertyType.getValueTypeId().equals("string") || propertyType.getValueTypeId().equals("email") ||
                                propertyType.getValueTypeId().equals("date")) {
                            if (BooleanUtils.isTrue(propertyType.isMultivalued())) {
                                String multivalueArray = profileData[fieldsMapping.get(fieldMappingKey)].trim();
                                if (StringUtils.isNotBlank(multiValueDelimiter) && multiValueDelimiter.length() == 2) {
                                    multivalueArray = multivalueArray.replaceAll("\\" + multiValueDelimiter.charAt(0), "").replaceAll("\\" + multiValueDelimiter.charAt(1), "");
                                }
                                if(multivalueArray.contains(multiValueSeparator)) {
                                    String[] valuesArray = multivalueArray.split("\\" + multiValueSeparator);
                                    properties.put(fieldMappingKey, valuesArray);
                                } else {
                                    if(StringUtils.isNotBlank(multivalueArray)) {
                                        properties.put(fieldMappingKey, new String[]{multivalueArray});
                                    } else {
                                        properties.put(fieldMappingKey, new String[]{});
                                    }
                                }
                            } else {
                                String singleValue = profileData[fieldsMapping.get(fieldMappingKey)].trim();
                                properties.put(fieldMappingKey, singleValue);
                            }
                        } else if (propertyType.getValueTypeId().equals("boolean")) {
                            properties.put(fieldMappingKey, Boolean.valueOf(profileData[fieldsMapping.get(fieldMappingKey)].trim()));
                        } else if (propertyType.getValueTypeId().equals("integer")) {
                            properties.put(fieldMappingKey, Integer.valueOf(profileData[fieldsMapping.get(fieldMappingKey)].trim()));
                        } else if (propertyType.getValueTypeId().equals("long")) {
                            properties.put(fieldMappingKey, Long.valueOf(profileData[fieldsMapping.get(fieldMappingKey)].trim()));
                        }
                    } catch (Throwable t) {
                        LOGGER.error("Error converting profileData", t);
                        if (fieldMappingKey != null && fieldsMapping.get(fieldMappingKey) != null && profileData != null && profileData[fieldsMapping.get(fieldMappingKey)] != null) {
                            throw new BadProfileDataFormatException("Unable to convert '" + profileData[fieldsMapping.get(fieldMappingKey)].trim() + "' to " + propertyType!=null?propertyType.getValueTypeId():"Null propertyType ", new Throwable("DATA_TYPE"));
                        } else {
                            throw new BadProfileDataFormatException("Unable to find profile data for key " + fieldMappingKey, new Throwable("DATA_TYPE"));
                        }
                    }

                }
            }
            profileToImport.setProperties(properties);
            profileToImport.setMergingProperty(mergingProperty);
            profileToImport.setPropertiesToOverwrite(propertiesToOverwrite);
            profileToImport.setOverwriteExistingProfiles(overwriteExistingProfiles);
            if (hasDeleteColumn && StringUtils.isNotBlank(profileData[profileData.length - 1]) &&
                    Boolean.parseBoolean(profileData[profileData.length - 1].trim())) {
                profileToImport.setProfileToDelete(true);
            }
        } else {
            throw new BadProfileDataFormatException("Empty line : line [" + ((Integer) exchange.getProperty("CamelSplitIndex") + 1) + "]", new Throwable("EMPTY_LINE"));
        }
        exchange.getIn().setBody(profileToImport, ProfileToImport.class);
        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
            exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
            exchange.getIn().setHeader(KafkaConstants.KEY, "1");
        }
    }