in src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java [210:236]
private RocketMQRowDataConverter createConverter() {
final int[] metadataPositions =
Stream.of(WritableMetadata.values())
.mapToInt(
m -> {
final int pos = metadataKeys.indexOf(m.key);
if (pos < 0) {
return -1;
}
return schema.getFieldCount() + pos;
})
.toArray();
return new RocketMQRowDataConverter(
topic,
tag,
dynamicColumn,
fieldDelimiter,
encoding,
isDynamicTag,
isDynamicTagIncluded,
writeKeysToBody,
keyColumns,
convertToRowTypeInfo(schema.toRowDataType(), schema.getFieldNames()),
schema.getFieldDataTypes(),
metadataKeys.size() > 0,
metadataPositions);
}