in src/main/java/org/apache/flink/connector/rocketmq/source/reader/deserializer/RowDeserializationSchema.java [198:241]
private void deserializeBytesMessage(BytesMessage message, Collector<RowData> collector) {
String body;
try {
body = new String(message.getData(), encoding);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
String[] lines = StringUtils.split(body, lineDelimiter);
for (String line : lines) {
String[] data = StringUtils.splitPreserveAllTokens(line, fieldDelimiter);
if (dataColumnSize == 1) {
data = new String[1];
data[0] = line;
}
if (data.length < dataColumnSize) {
data = handleFieldMissing(data);
} else if (data.length > dataColumnSize) {
data = handleFieldIncrement(data);
}
if (data == null) {
continue;
}
GenericRowData rowData = new GenericRowData(totalColumnSize);
boolean skip = false;
for (int index = 0; index < totalColumnSize; index++) {
try {
String fieldValue = getValue(message, data, line, index);
rowData.setField(
index,
StringSerializer.deserialize(
fieldValue,
fieldTypes[index],
fieldDataTypes[index],
new HashSet<>()));
} catch (Exception e) {
skip = handleException(rowData, index, data, e);
}
}
if (skip) {
continue;
}
collector.collect(rowData);
}
}