in src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQRowDataConverter.java [102:148]
public void open() {
if (rowTypeInfo.getArity() == 1
&& rowTypeInfo.getFieldTypes()[0].getTypeClass().equals(byte[].class)) {
onlyVarbinary = true;
}
Set<Integer> excludedFields = new HashSet<>();
if (keyColumns != null) {
keyFieldIndexes = new int[keyColumns.length];
for (int index = 0; index < keyColumns.length; index++) {
int fieldIndex = rowTypeInfo.getFieldIndex(keyColumns[index]);
checkState(
fieldIndex >= 0,
String.format(
"[MetaQConverter] Could not find the message-key column: %s.",
keyColumns[index]));
keyFieldIndexes[index] = fieldIndex;
if (!writeKeysToBody) {
excludedFields.add(fieldIndex);
}
}
} else {
keyFieldIndexes = new int[0];
}
if (isDynamicTag && dynamicColumn != null) {
tagFieldIndexes = new int[1];
int fieldIndex = rowTypeInfo.getFieldIndex(dynamicColumn);
checkState(
fieldIndex >= 0,
String.format(
"[MetaQConverter] Could not find the tag column: %s.", dynamicColumn));
tagFieldIndexes[0] = fieldIndex;
if (!isDynamicTagIncluded) {
excludedFields.add(fieldIndex);
}
} else {
tagFieldIndexes = new int[0];
}
bodyFieldIndexes = new int[rowTypeInfo.getArity() - excludedFields.size()];
bodyFieldTypes = new DataType[rowTypeInfo.getArity() - excludedFields.size()];
int index = 0;
for (int num = 0; num < rowTypeInfo.getArity(); num++) {
if (!excludedFields.contains(num)) {
bodyFieldIndexes[index] = num;
bodyFieldTypes[index++] = fieldDataTypes[num];
}
}
}