private void deserializeBytesMessage()

in src/main/java/org/apache/flink/connector/rocketmq/source/reader/deserializer/RowDeserializationSchema.java [198:241]


    private void deserializeBytesMessage(BytesMessage message, Collector<RowData> collector) {
        String body;
        try {
            body = new String(message.getData(), encoding);
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
        String[] lines = StringUtils.split(body, lineDelimiter);
        for (String line : lines) {
            String[] data = StringUtils.splitPreserveAllTokens(line, fieldDelimiter);
            if (dataColumnSize == 1) {
                data = new String[1];
                data[0] = line;
            }
            if (data.length < dataColumnSize) {
                data = handleFieldMissing(data);
            } else if (data.length > dataColumnSize) {
                data = handleFieldIncrement(data);
            }
            if (data == null) {
                continue;
            }
            GenericRowData rowData = new GenericRowData(totalColumnSize);
            boolean skip = false;
            for (int index = 0; index < totalColumnSize; index++) {
                try {
                    String fieldValue = getValue(message, data, line, index);
                    rowData.setField(
                            index,
                            StringSerializer.deserialize(
                                    fieldValue,
                                    fieldTypes[index],
                                    fieldDataTypes[index],
                                    new HashSet<>()));
                } catch (Exception e) {
                    skip = handleException(rowData, index, data, e);
                }
            }
            if (skip) {
                continue;
            }
            collector.collect(rowData);
        }
    }