public List poll()

in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceIncrementConnector.java [369:452]


    public List<ConnectRecord> poll() {
        int emptyTimes = 0;
        com.alibaba.otter.canal.protocol.Message message = null;
        if (sourceConfig.getBatchTimeout() < 0) {
            while (running) {
                message = canalServer.getWithoutAck(clientIdentity, sourceConfig.getBatchSize());
                if (message == null || message.getId() == -1L) { // empty
                    applyWait(emptyTimes++);
                } else {
                    break;
                }
            }
        } else { // perform with timeout
            while (running) {
                message =
                    canalServer.getWithoutAck(clientIdentity, sourceConfig.getBatchSize(), sourceConfig.getBatchTimeout(), TimeUnit.MILLISECONDS);
                if (message == null || message.getId() == -1L) { // empty
                    continue;
                }
                break;
            }
        }

        List<Entry> entries;
        assert message != null;
        if (message.isRaw()) {
            entries = new ArrayList<>(message.getRawEntries().size());
            for (ByteString entry : message.getRawEntries()) {
                try {
                    entries.add(Entry.parseFrom(entry));
                } catch (InvalidProtocolBufferException e) {
                    throw new RuntimeException(e);
                }
            }
        } else {
            entries = message.getEntries();
        }

        List<ConnectRecord> result = new ArrayList<>();
        // key: Xid offset
        Map<Long, List<CanalConnectRecord>> connectorRecordMap = EntryParser.parse(sourceConfig, entries, tableMgr);

        if (!connectorRecordMap.isEmpty()) {
            Set<Map.Entry<Long, List<CanalConnectRecord>>> entrySet = connectorRecordMap.entrySet();
            for (Map.Entry<Long, List<CanalConnectRecord>> entry : entrySet) {
                List<CanalConnectRecord> connectRecordList = entry.getValue();
                CanalConnectRecord lastRecord = entry.getValue().get(connectRecordList.size() - 1);
                CanalRecordPartition canalRecordPartition = new CanalRecordPartition();
                canalRecordPartition.setServerUUID(sourceConfig.getServerUUID());
                canalRecordPartition.setJournalName(lastRecord.getJournalName());
                canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
                // Xid offset with gtid
                Long binLogOffset = entry.getKey();
                CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
                canalRecordOffset.setOffset(binLogOffset);
                if (StringUtils.isNotEmpty(lastRecord.getGtid()) && StringUtils.isNotEmpty(lastRecord.getCurrentGtid())) {
                    canalRecordOffset.setGtid(lastRecord.getGtid());
                    canalRecordOffset.setCurrentGtid(lastRecord.getCurrentGtid());
                }

                // split record list
                List<List<CanalConnectRecord>> splitLists = new ArrayList<>();
                for (int i = 0; i < connectRecordList.size(); i += sourceConfig.getBatchSize()) {
                    int end = Math.min(i + sourceConfig.getBatchSize(), connectRecordList.size());
                    List<CanalConnectRecord> subList = connectRecordList.subList(i, end);
                    splitLists.add(subList);
                }

                for (int i = 0; i < splitLists.size(); i++) {
                    ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis());
                    connectRecord.addExtension("messageId", String.valueOf(message.getId()));
                    connectRecord.addExtension("batchIndex", i);
                    connectRecord.addExtension("totalBatches", splitLists.size());
                    connectRecord.setData(JsonUtils.toJSONString(splitLists.get(i)).getBytes(StandardCharsets.UTF_8));
                    result.add(connectRecord);
                }
            }
            log.debug("message {} has been processed", message);
        }
        log.debug("ack message, messageId {}", message.getId());
        canalServer.ack(clientIdentity, message.getId());

        return result;
    }