public void process()

in library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java [42:89]


    public void process(@ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, @ExchangeProperty("timestampKeys") String timestampKeys, @ExchangeProperty("timestampKeyFormat") String timestampKeyFormat, Exchange ex) throws ParseException {
        final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);

        final Pattern TIMESTAMP = Pattern.compile("$[timestamp]", Pattern.LITERAL);

        final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
        fmt.setTimeZone(TimeZone.getTimeZone("UTC"));

        ObjectMapper mapper = new ObjectMapper();
        List<String> splittedKeys = new ArrayList<>();
        JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
        Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new TypeReference<Map<Object, Object>>(){});
        if (ObjectHelper.isNotEmpty(timestampKeys)) {
            splittedKeys = Arrays.stream(timestampKeys.split(",")).collect(Collectors.toList());
        }

        Object rawTimestamp = null;
        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class);
        for (String key:
             splittedKeys) {
             if (ObjectHelper.isNotEmpty(key)) {
                 rawTimestamp = body.get(key);
               break;
             }
        }
        long timestamp;
        if (ObjectHelper.isNotEmpty(timestampKeyFormat) && ObjectHelper.isNotEmpty(rawTimestamp) && !timestampKeyFormat.equalsIgnoreCase("timestamp")) {
            final SimpleDateFormat timestampKeyFmt = new SimpleDateFormat(timestampKeyFormat);
            timestampKeyFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
            timestamp = timestampKeyFmt.parse((String) rawTimestamp).getTime();
        } else {
            timestamp = Long.valueOf((String) rawTimestamp);
        }
        if (ObjectHelper.isNotEmpty(timestamp)) {
            final String formattedTimestamp = fmt.format(new Date(timestamp));
            String replace1;
            String updatedTopic;

            if (ObjectHelper.isNotEmpty(topicName)) {
                replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
                updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
            } else {
                replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
                updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
            }
            ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, updatedTopic);
        }
    }