in library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/ValueToKey.java [37:56]
public void process(@ExchangeProperty("fields") String fields, Exchange ex) throws InvalidPayloadException {
List<String> splittedFields = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new TypeReference<Map<Object, Object>>(){});
if (ObjectHelper.isNotEmpty(fields)) {
splittedFields = Arrays.stream(fields.split(",")).collect(Collectors.toList());
}
Map<Object, Object> key = new HashMap<>();
for (Map.Entry entry:
body.entrySet()) {
final String fieldName = (String) entry.getKey();
if (filterNames(fieldName, splittedFields)) {
final Object fieldValue = entry.getValue();
key.put(entry.getKey(), fieldValue);
}
}
ex.getMessage().setHeader(KafkaConstants.KEY, key);
}