private void deserialize()

in src/main/java/org/apache/flink/connector/rocketmq/source/reader/deserializer/RowDeserializationSchema.java [139:184]


    private void deserialize(List<BytesMessage> messages, MetadataCollector collector) {
        if (null == messages || messages.size() == 0) {
            return;
        }
        for (BytesMessage message : messages) {
            collector.message = message;
            if (isOnlyHaveVarbinaryDataField()) {
                GenericRowData rowData = new GenericRowData(totalColumnSize);
                int dataIndex = dataIndexMapping.get(0);
                rowData.setField(dataIndex, message.getData());
                for (int index = 0; index < totalColumnSize; index++) {
                    if (index == dataIndex) {
                        continue;
                    }
                    String headerValue = getHeaderValue(message, index);
                    rowData.setField(
                            index,
                            StringSerializer.deserialize(
                                    headerValue,
                                    fieldTypes[index],
                                    fieldDataTypes[index],
                                    new HashSet<>()));
                }
                collector.collect(rowData);
            } else if (isAllHeaderField()) {
                GenericRowData rowData = new GenericRowData(totalColumnSize);
                for (int index = 0; index < totalColumnSize; index++) {
                    String headerValue = getHeaderValue(message, index);
                    rowData.setField(
                            index,
                            StringSerializer.deserialize(
                                    headerValue,
                                    fieldTypes[index],
                                    fieldDataTypes[index],
                                    new HashSet<>()));
                }
                collector.collect(rowData);
            } else {
                if (message.getData() == null) {
                    LOGGER.info("Deserialize empty BytesMessage body, ignore the empty message.");
                    return;
                }
                deserializeBytesMessage(message, collector);
            }
        }
    }