in src/main/java/org/apache/flink/connector/rocketmq/legacy/common/serialization/RowKeyValueDeserializationSchema.java [132:172]
private RowData deserializeValue(byte[] value) {
String body;
try {
body = new String(value, encoding);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
if (columnSize == 1) {
data = new String[1];
data[0] = body;
}
if (data.length < columnSize) {
data = handleFieldMissing(data);
} else if (data.length > columnSize) {
data = handleFieldIncrement(data);
}
if (data == null) {
return null;
}
GenericRowData rowData = new GenericRowData(columnSize);
boolean skip = false;
for (int index = 0; index < columnSize; index++) {
try {
String fieldValue = getValue(data, body, index);
rowData.setField(
index,
StringSerializer.deserialize(
fieldValue,
fieldTypes[index],
fieldDataTypes[index],
new HashSet<>()));
} catch (Exception e) {
skip = handleException(rowData, index, data, e);
}
}
if (skip) {
return null;
}
return rowData;
}