in library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java [33:65]
public void process(@ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, @ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange ex) {
final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);
final Pattern TIMESTAMP = Pattern.compile("$[timestamp]", Pattern.LITERAL);
final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
long timestamp;
String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class);
Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
if (rawTimestamp instanceof Long) {
timestamp = (Long) rawTimestamp;
} else if (rawTimestamp instanceof Instant) {
timestamp = ((Instant) rawTimestamp).toEpochMilli();
} else {
timestamp = (Long) rawTimestamp;
}
if (ObjectHelper.isNotEmpty(timestamp)) {
final String formattedTimestamp = fmt.format(new Date(timestamp));
String replace1;
String updatedTopic;
if (ObjectHelper.isNotEmpty(topicName)) {
replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
} else {
replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
}
ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, updatedTopic);
}
}