in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceIncrementConnector.java [369:452]
public List<ConnectRecord> poll() {
int emptyTimes = 0;
com.alibaba.otter.canal.protocol.Message message = null;
if (sourceConfig.getBatchTimeout() < 0) {
while (running) {
message = canalServer.getWithoutAck(clientIdentity, sourceConfig.getBatchSize());
if (message == null || message.getId() == -1L) { // empty
applyWait(emptyTimes++);
} else {
break;
}
}
} else { // perform with timeout
while (running) {
message =
canalServer.getWithoutAck(clientIdentity, sourceConfig.getBatchSize(), sourceConfig.getBatchTimeout(), TimeUnit.MILLISECONDS);
if (message == null || message.getId() == -1L) { // empty
continue;
}
break;
}
}
List<Entry> entries;
assert message != null;
if (message.isRaw()) {
entries = new ArrayList<>(message.getRawEntries().size());
for (ByteString entry : message.getRawEntries()) {
try {
entries.add(Entry.parseFrom(entry));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
} else {
entries = message.getEntries();
}
List<ConnectRecord> result = new ArrayList<>();
// key: Xid offset
Map<Long, List<CanalConnectRecord>> connectorRecordMap = EntryParser.parse(sourceConfig, entries, tableMgr);
if (!connectorRecordMap.isEmpty()) {
Set<Map.Entry<Long, List<CanalConnectRecord>>> entrySet = connectorRecordMap.entrySet();
for (Map.Entry<Long, List<CanalConnectRecord>> entry : entrySet) {
List<CanalConnectRecord> connectRecordList = entry.getValue();
CanalConnectRecord lastRecord = entry.getValue().get(connectRecordList.size() - 1);
CanalRecordPartition canalRecordPartition = new CanalRecordPartition();
canalRecordPartition.setServerUUID(sourceConfig.getServerUUID());
canalRecordPartition.setJournalName(lastRecord.getJournalName());
canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
// Xid offset with gtid
Long binLogOffset = entry.getKey();
CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
canalRecordOffset.setOffset(binLogOffset);
if (StringUtils.isNotEmpty(lastRecord.getGtid()) && StringUtils.isNotEmpty(lastRecord.getCurrentGtid())) {
canalRecordOffset.setGtid(lastRecord.getGtid());
canalRecordOffset.setCurrentGtid(lastRecord.getCurrentGtid());
}
// split record list
List<List<CanalConnectRecord>> splitLists = new ArrayList<>();
for (int i = 0; i < connectRecordList.size(); i += sourceConfig.getBatchSize()) {
int end = Math.min(i + sourceConfig.getBatchSize(), connectRecordList.size());
List<CanalConnectRecord> subList = connectRecordList.subList(i, end);
splitLists.add(subList);
}
for (int i = 0; i < splitLists.size(); i++) {
ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis());
connectRecord.addExtension("messageId", String.valueOf(message.getId()));
connectRecord.addExtension("batchIndex", i);
connectRecord.addExtension("totalBatches", splitLists.size());
connectRecord.setData(JsonUtils.toJSONString(splitLists.get(i)).getBytes(StandardCharsets.UTF_8));
result.add(connectRecord);
}
}
log.debug("message {} has been processed", message);
}
log.debug("ack message, messageId {}", message.getId());
canalServer.ack(clientIdentity, message.getId());
return result;
}