in adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java [271:293]
private ConnectRecord convertToSinkRecord(MessageExt messageExt) {
Map<String, String> properties = messageExt.getProperties();
Schema schema;
Long timestamp;
ConnectRecord sinkRecord;
String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
String connectSchema = properties.get(RuntimeConfigDefine.CONNECT_SCHEMA);
schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null;
byte[] body = messageExt.getBody();
RecordPartition recordPartition = convertToRecordPartition(messageExt.getTopic(), messageExt.getBrokerName(), messageExt.getQueueId(), messageExt.getMsgId());
RecordOffset recordOffset = convertToRecordOffset(messageExt.getQueueOffset());
String bodyStr = new String(body, StandardCharsets.UTF_8);
sinkRecord = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
KeyValue keyValue = new DefaultKeyValue();
if (MapUtils.isNotEmpty(properties)) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
keyValue.put(entry.getKey(), entry.getValue());
}
}
sinkRecord.addExtension(keyValue);
return sinkRecord;
}