in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java [441:457]
public Object read(RowData row, int pos) {
if (row.isNullAt(pos)) {
return null;
}
final MapData map = row.getMap(pos);
final ArrayData keyArray = map.keyArray();
final ArrayData valueArray = map.valueArray();
final List<Header> headers = new ArrayList<>();
for (int i = 0; i < keyArray.size(); i++) {
if (!keyArray.isNullAt(i) && !valueArray.isNullAt(i)) {
final String key = keyArray.getString(i).toString();
final byte[] value = valueArray.getBinary(i);
headers.add(new KafkaHeader(key, value));
}
}
return headers;
}