private void setAdditionalHeaders()

in core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java [333:373]


    private void setAdditionalHeaders(SourceRecord record, Map<String, Object> map, String prefix) {

        for (Map.Entry<String, Object> entry : map.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            String keyCamelHeader = prefix + key;

            if (value instanceof String) {
                record.headers().addString(keyCamelHeader, (String)value);
            } else if (value instanceof Boolean) {
                record.headers().addBoolean(keyCamelHeader, (boolean)value);
            } else if (value instanceof Byte) {
                record.headers().addByte(keyCamelHeader, (byte)value);
            } else if (value instanceof Byte[]) {
                final Byte[] array = (Byte[])value;
                final byte[] bytes = new byte[array.length];

                for (int i = 0; i < array.length; i++) {
                    bytes[i] = array[i];
                }

                record.headers().addBytes(keyCamelHeader, bytes);
            } else if (value instanceof Date) {
                record.headers().addTimestamp(keyCamelHeader, (Date)value);
            } else if (value instanceof BigDecimal) {
                //XXX: kafka connect configured header converter takes care of the encoding,
                //default: org.apache.kafka.connect.storage.SimpleHeaderConverter
                record.headers().addDecimal(keyCamelHeader, (BigDecimal)value);
            } else if (value instanceof Double) {
                record.headers().addDouble(keyCamelHeader, (double)value);
            } else if (value instanceof Float) {
                record.headers().addFloat(keyCamelHeader, (float)value);
            } else if (value instanceof Integer) {
                record.headers().addInt(keyCamelHeader, (int)value);
            } else if (value instanceof Long) {
                record.headers().addLong(keyCamelHeader, (long)value);
            } else if (value instanceof Short) {
                record.headers().addShort(keyCamelHeader, (short)value);
            }
        }
    }