public void process()

in library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java [33:65]


    public void process(@ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, @ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange ex) {
        final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);

        final Pattern TIMESTAMP = Pattern.compile("$[timestamp]", Pattern.LITERAL);

        final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
        fmt.setTimeZone(TimeZone.getTimeZone("UTC"));

        long timestamp;
        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class);
        Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
        if (rawTimestamp instanceof Long) {
            timestamp = (Long) rawTimestamp;
        } else if (rawTimestamp instanceof Instant) {
            timestamp = ((Instant) rawTimestamp).toEpochMilli();
        } else {
            timestamp = (Long) rawTimestamp;
        }
        if (ObjectHelper.isNotEmpty(timestamp)) {
            final String formattedTimestamp = fmt.format(new Date(timestamp));
            String replace1;
            String updatedTopic;

            if (ObjectHelper.isNotEmpty(topicName)) {
                replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
                updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
            } else {
                replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
                updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
            }
            ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, updatedTopic);
        }
    }