protected void process()

in pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java [104:138]


    protected void process() {
        while (running) {
            try {
                MDC.put(DESTINATION, canalSourceConfig.getDestination());
                connector.connect();
                log.info("start canal process");
                connector.subscribe();
                while (running) {
                    Message message = connector.getWithoutAck(canalSourceConfig.getBatchSize());
                    message.setRaw(false);
                    List<FlatMessage> flatMessages = MessageUtils.messageConverter(message);
                    long batchId = getMessageId(message);
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        if (flatMessages != null) {
                            CanalRecord<V> canalRecord = new CanalRecord<>(connector);
                            canalRecord.setId(batchId);
                            canalRecord.setRecord(extractValue(flatMessages));
                            consume(canalRecord);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("process error!", e);
            } finally {
                connector.disconnect();
                MDC.remove(DESTINATION);
            }
        }
    }