in src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQRowDataConverter.java [150:200]
public Message convert(RowData row) {
if (row.getRowKind() != RowKind.INSERT && row.getRowKind() != RowKind.UPDATE_AFTER) {
return null;
}
Message message = new Message();
message.setTopic(topic);
List<String> keys = new ArrayList<>();
for (int fieldIndex : keyFieldIndexes) {
keys.add(row.getString(fieldIndex).toString());
}
if (keys.size() > 0) {
message.setKeys(keys);
}
if (!isDynamicTag) {
if (tag != null && tag.length() > 0) {
message.setTags(tag);
}
} else {
checkState(tagFieldIndexes.length > 0, "No message tag column set.");
message.setTags(row.getString(tagFieldIndexes[0]).toString());
}
if (onlyVarbinary) {
message.setBody(row.getBinary(0));
message.setWaitStoreMsgOK(true);
} else {
Object[] values = new Object[bodyFieldIndexes.length];
for (int index = 0; index < bodyFieldIndexes.length; index++) {
values[index] =
RowData.createFieldGetter(
bodyFieldTypes[index].getLogicalType(),
bodyFieldIndexes[index])
.getFieldOrNull(row);
}
try {
message.setBody(StringUtils.join(values, fieldDelimiter).getBytes(encoding));
message.setWaitStoreMsgOK(true);
} catch (UnsupportedEncodingException e) {
LOG.error(
String.format(
"Unsupported ''{%s}'' encoding charset. Check the encoding configItem in the DDL.",
encoding),
e);
}
}
if (hasMetadata) {
String messageKeys = readMetadata(row, KEYS);
message.setKeys(messageKeys);
message.setTags(readMetadata(row, TAGS));
}
return message;
}