in src/main/java/org/apache/flink/connector/rocketmq/source/reader/deserializer/RowDeserializationSchema.java [139:184]
private void deserialize(List<BytesMessage> messages, MetadataCollector collector) {
if (null == messages || messages.size() == 0) {
return;
}
for (BytesMessage message : messages) {
collector.message = message;
if (isOnlyHaveVarbinaryDataField()) {
GenericRowData rowData = new GenericRowData(totalColumnSize);
int dataIndex = dataIndexMapping.get(0);
rowData.setField(dataIndex, message.getData());
for (int index = 0; index < totalColumnSize; index++) {
if (index == dataIndex) {
continue;
}
String headerValue = getHeaderValue(message, index);
rowData.setField(
index,
StringSerializer.deserialize(
headerValue,
fieldTypes[index],
fieldDataTypes[index],
new HashSet<>()));
}
collector.collect(rowData);
} else if (isAllHeaderField()) {
GenericRowData rowData = new GenericRowData(totalColumnSize);
for (int index = 0; index < totalColumnSize; index++) {
String headerValue = getHeaderValue(message, index);
rowData.setField(
index,
StringSerializer.deserialize(
headerValue,
fieldTypes[index],
fieldDataTypes[index],
new HashSet<>()));
}
collector.collect(rowData);
} else {
if (message.getData() == null) {
LOGGER.info("Deserialize empty BytesMessage body, ignore the empty message.");
return;
}
deserializeBytesMessage(message, collector);
}
}
}