in library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java [42:89]
public void process(@ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, @ExchangeProperty("timestampKeys") String timestampKeys, @ExchangeProperty("timestampKeyFormat") String timestampKeyFormat, Exchange ex) throws ParseException {
final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);
final Pattern TIMESTAMP = Pattern.compile("$[timestamp]", Pattern.LITERAL);
final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
ObjectMapper mapper = new ObjectMapper();
List<String> splittedKeys = new ArrayList<>();
JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new TypeReference<Map<Object, Object>>(){});
if (ObjectHelper.isNotEmpty(timestampKeys)) {
splittedKeys = Arrays.stream(timestampKeys.split(",")).collect(Collectors.toList());
}
Object rawTimestamp = null;
String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class);
for (String key:
splittedKeys) {
if (ObjectHelper.isNotEmpty(key)) {
rawTimestamp = body.get(key);
break;
}
}
long timestamp;
if (ObjectHelper.isNotEmpty(timestampKeyFormat) && ObjectHelper.isNotEmpty(rawTimestamp) && !timestampKeyFormat.equalsIgnoreCase("timestamp")) {
final SimpleDateFormat timestampKeyFmt = new SimpleDateFormat(timestampKeyFormat);
timestampKeyFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
timestamp = timestampKeyFmt.parse((String) rawTimestamp).getTime();
} else {
timestamp = Long.valueOf((String) rawTimestamp);
}
if (ObjectHelper.isNotEmpty(timestamp)) {
final String formattedTimestamp = fmt.format(new Date(timestamp));
String replace1;
String updatedTopic;
if (ObjectHelper.isNotEmpty(topicName)) {
replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
} else {
replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
}
ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, updatedTopic);
}
}