public void open()

in src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQRowDataConverter.java [102:148]


    public void open() {
        if (rowTypeInfo.getArity() == 1
                && rowTypeInfo.getFieldTypes()[0].getTypeClass().equals(byte[].class)) {
            onlyVarbinary = true;
        }
        Set<Integer> excludedFields = new HashSet<>();
        if (keyColumns != null) {
            keyFieldIndexes = new int[keyColumns.length];
            for (int index = 0; index < keyColumns.length; index++) {
                int fieldIndex = rowTypeInfo.getFieldIndex(keyColumns[index]);
                checkState(
                        fieldIndex >= 0,
                        String.format(
                                "[MetaQConverter] Could not find the message-key column: %s.",
                                keyColumns[index]));
                keyFieldIndexes[index] = fieldIndex;
                if (!writeKeysToBody) {
                    excludedFields.add(fieldIndex);
                }
            }
        } else {
            keyFieldIndexes = new int[0];
        }
        if (isDynamicTag && dynamicColumn != null) {
            tagFieldIndexes = new int[1];
            int fieldIndex = rowTypeInfo.getFieldIndex(dynamicColumn);
            checkState(
                    fieldIndex >= 0,
                    String.format(
                            "[MetaQConverter] Could not find the tag column: %s.", dynamicColumn));
            tagFieldIndexes[0] = fieldIndex;
            if (!isDynamicTagIncluded) {
                excludedFields.add(fieldIndex);
            }
        } else {
            tagFieldIndexes = new int[0];
        }
        bodyFieldIndexes = new int[rowTypeInfo.getArity() - excludedFields.size()];
        bodyFieldTypes = new DataType[rowTypeInfo.getArity() - excludedFields.size()];
        int index = 0;
        for (int num = 0; num < rowTypeInfo.getArity(); num++) {
            if (!excludedFields.contains(num)) {
                bodyFieldIndexes[index] = num;
                bodyFieldTypes[index++] = fieldDataTypes[num];
            }
        }
    }