in core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java [333:373]
private void setAdditionalHeaders(SourceRecord record, Map<String, Object> map, String prefix) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
String keyCamelHeader = prefix + key;
if (value instanceof String) {
record.headers().addString(keyCamelHeader, (String)value);
} else if (value instanceof Boolean) {
record.headers().addBoolean(keyCamelHeader, (boolean)value);
} else if (value instanceof Byte) {
record.headers().addByte(keyCamelHeader, (byte)value);
} else if (value instanceof Byte[]) {
final Byte[] array = (Byte[])value;
final byte[] bytes = new byte[array.length];
for (int i = 0; i < array.length; i++) {
bytes[i] = array[i];
}
record.headers().addBytes(keyCamelHeader, bytes);
} else if (value instanceof Date) {
record.headers().addTimestamp(keyCamelHeader, (Date)value);
} else if (value instanceof BigDecimal) {
//XXX: kafka connect configured header converter takes care of the encoding,
//default: org.apache.kafka.connect.storage.SimpleHeaderConverter
record.headers().addDecimal(keyCamelHeader, (BigDecimal)value);
} else if (value instanceof Double) {
record.headers().addDouble(keyCamelHeader, (double)value);
} else if (value instanceof Float) {
record.headers().addFloat(keyCamelHeader, (float)value);
} else if (value instanceof Integer) {
record.headers().addInt(keyCamelHeader, (int)value);
} else if (value instanceof Long) {
record.headers().addLong(keyCamelHeader, (long)value);
} else if (value instanceof Short) {
record.headers().addShort(keyCamelHeader, (short)value);
}
}
}