public Message convert()

in src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQRowDataConverter.java [150:200]


    public Message convert(RowData row) {
        if (row.getRowKind() != RowKind.INSERT && row.getRowKind() != RowKind.UPDATE_AFTER) {
            return null;
        }
        Message message = new Message();
        message.setTopic(topic);
        List<String> keys = new ArrayList<>();
        for (int fieldIndex : keyFieldIndexes) {
            keys.add(row.getString(fieldIndex).toString());
        }
        if (keys.size() > 0) {
            message.setKeys(keys);
        }
        if (!isDynamicTag) {
            if (tag != null && tag.length() > 0) {
                message.setTags(tag);
            }
        } else {
            checkState(tagFieldIndexes.length > 0, "No message tag column set.");
            message.setTags(row.getString(tagFieldIndexes[0]).toString());
        }
        if (onlyVarbinary) {
            message.setBody(row.getBinary(0));
            message.setWaitStoreMsgOK(true);
        } else {
            Object[] values = new Object[bodyFieldIndexes.length];
            for (int index = 0; index < bodyFieldIndexes.length; index++) {
                values[index] =
                        RowData.createFieldGetter(
                                        bodyFieldTypes[index].getLogicalType(),
                                        bodyFieldIndexes[index])
                                .getFieldOrNull(row);
            }
            try {
                message.setBody(StringUtils.join(values, fieldDelimiter).getBytes(encoding));
                message.setWaitStoreMsgOK(true);
            } catch (UnsupportedEncodingException e) {
                LOG.error(
                        String.format(
                                "Unsupported ''{%s}'' encoding charset. Check the encoding configItem in the DDL.",
                                encoding),
                        e);
            }
        }
        if (hasMetadata) {
            String messageKeys = readMetadata(row, KEYS);
            message.setKeys(messageKeys);
            message.setTags(readMetadata(row, TAGS));
        }
        return message;
    }