public void process()

in library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/ValueToKey.java [37:56]


    public void process(@ExchangeProperty("fields") String fields, Exchange ex) throws InvalidPayloadException {
        List<String> splittedFields = new ArrayList<>();
        ObjectMapper mapper = new ObjectMapper();
        JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
        Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new TypeReference<Map<Object, Object>>(){});
        if (ObjectHelper.isNotEmpty(fields)) {
            splittedFields = Arrays.stream(fields.split(",")).collect(Collectors.toList());
        }
        Map<Object, Object> key = new HashMap<>();
        for (Map.Entry entry:
             body.entrySet()) {
            final String fieldName = (String) entry.getKey();
            if (filterNames(fieldName, splittedFields)) {
                final Object fieldValue = entry.getValue();
                key.put(entry.getKey(), fieldValue);
            }
        }

        ex.getMessage().setHeader(KafkaConstants.KEY, key);
    }