public void start()

in parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java [144:351]


    public void start() {
        super.start();
        MDC.put("destination", destination);
        // 配置transaction buffer
        // 初始化缓冲队列
        transactionBuffer.setBufferSize(transactionSize);// 设置buffer大小
        transactionBuffer.start();
        // 构造bin log parser
        binlogParser = buildParser();// 初始化一下BinLogParser
        binlogParser.start();
        // 启动工作线程
        parseThread = new Thread(new Runnable() {

            public void run() {
                MDC.put("destination", String.valueOf(destination));
                ErosaConnection erosaConnection = null;
                boolean isMariaDB = false;
                while (running) {
                    try {
                        // 开始执行replication
                        // 1. 构造Erosa连接
                        erosaConnection = buildErosaConnection();

                        // 2. 启动一个心跳线程
                        startHeartBeat(erosaConnection);

                        // 3. 执行dump前的准备工作
                        preDump(erosaConnection);

                        erosaConnection.connect();// 链接

                        long queryServerId = erosaConnection.queryServerId();
                        if (queryServerId != 0) {
                            serverId = queryServerId;
                        }

                        if (erosaConnection instanceof MysqlConnection) {
                            isMariaDB = ((MysqlConnection) erosaConnection).isMariaDB();
                        }
                        // 4. 获取最后的位置信息
                        long start = System.currentTimeMillis();
                        logger.warn("---> begin to find start position, it will be long time for reset or first position");
                        EntryPosition position = findStartPosition(erosaConnection);
                        final EntryPosition startPosition = position;
                        if (startPosition == null) {
                            throw new PositionNotFoundException("can't find start position for " + destination);
                        }

                        if (!processTableMeta(startPosition)) {
                            throw new CanalParseException("can't find init table meta for " + destination
                                                          + " with position : " + startPosition);
                        }
                        long end = System.currentTimeMillis();
                        logger.warn("---> find start position successfully, {}", startPosition.toString() + " cost : "
                                                                                 + (end - start)
                                                                                 + "ms , the next step is binlog dump");
                        // 重新链接,因为在找position过程中可能有状态,需要断开后重建
                        erosaConnection.reconnect();

                        final SinkFunction sinkHandler = new SinkFunction<EVENT>() {

                            private LogPosition lastPosition;

                            public boolean sink(EVENT event) {
                                try {
                                    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, false);

                                    if (!running) {
                                        return false;
                                    }

                                    if (entry != null) {
                                        exception = null; // 有正常数据流过,清空exception
                                        transactionBuffer.add(entry);
                                        // 记录一下对应的positions
                                        this.lastPosition = buildLastPosition(entry);
                                        // 记录一下最后一次有数据的时间
                                        lastEntryTime = System.currentTimeMillis();
                                    }
                                    return running;
                                } catch (TableIdNotFoundException e) {
                                    throw e;
                                } catch (Throwable e) {
                                    if (e.getCause() instanceof TableIdNotFoundException) {
                                        throw (TableIdNotFoundException) e.getCause();
                                    }
                                    // 记录一下,出错的位点信息
                                    processSinkError(e,
                                        this.lastPosition,
                                        startPosition.getJournalName(),
                                        startPosition.getPosition());
                                    throw new CanalParseException(e); // 继续抛出异常,让上层统一感知
                                }
                            }

                        };

                        // 4. 开始dump数据
                        if (parallel) {
                            // build stage processor
                            multiStageCoprocessor = buildMultiStageCoprocessor();
                            if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
                                // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
                                GTIDSet gtidSet = parseGtidSet(startPosition.getGtid(), isMariaDB);
                                ((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet);
                                multiStageCoprocessor.start();
                                erosaConnection.dump(gtidSet, multiStageCoprocessor);
                            } else {
                                multiStageCoprocessor.start();
                                if (StringUtils.isEmpty(startPosition.getJournalName())
                                    && startPosition.getTimestamp() != null) {
                                    erosaConnection.dump(startPosition.getTimestamp(), multiStageCoprocessor);
                                } else {
                                    erosaConnection.dump(startPosition.getJournalName(),
                                        startPosition.getPosition(),
                                        multiStageCoprocessor);
                                }
                            }
                        } else {
                            if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
                                // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
                                erosaConnection.dump(parseGtidSet(startPosition.getGtid(), isMariaDB), sinkHandler);
                            } else {
                                if (StringUtils.isEmpty(startPosition.getJournalName())
                                    && startPosition.getTimestamp() != null) {
                                    erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
                                } else {
                                    erosaConnection.dump(startPosition.getJournalName(),
                                        startPosition.getPosition(),
                                        sinkHandler);
                                }
                            }
                        }
                    } catch (TableIdNotFoundException e) {
                        exception = e;
                        // 特殊处理TableIdNotFound异常,出现这样的异常,一种可能就是起始的position是一个事务当中,导致tablemap
                        // Event时间没解析过
                        needTransactionPosition.compareAndSet(false, true);
                        logger.error(String.format("dump address %s has an error, retrying. caused by ",
                            runningInfo.getAddress().toString()), e);
                    } catch (Throwable e) {
                        processDumpError(e);
                        exception = e;
                        if (!running) {
                            if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
                                throw new CanalParseException(String.format("dump address %s has an error, retrying. ",
                                    runningInfo.getAddress().toString()), e);
                            }
                        } else {
                            logger.error(String.format("dump address %s has an error, retrying. caused by ",
                                runningInfo.getAddress().toString()), e);
                            sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
                        }
                        if (parserExceptionHandler != null) {
                            parserExceptionHandler.handle(e);
                        }
                    } finally {
                        // 重新置为中断状态
                        Thread.interrupted();
                        // 关闭一下链接
                        afterDump(erosaConnection);
                        try {
                            if (erosaConnection != null) {
                                erosaConnection.disconnect();
                            }
                        } catch (IOException e1) {
                            if (!running) {
                                throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ",
                                    runningInfo.getAddress().toString()),
                                    e1);
                            } else {
                                logger.error("disconnect address {} has an error, retrying., caused by ",
                                    runningInfo.getAddress().toString(),
                                    e1);
                            }
                        }
                    }
                    // 出异常了,退出sink消费,释放一下状态
                    eventSink.interrupt();
                    transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
                    binlogParser.reset();// 重新置位
                    if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
                        // 处理 RejectedExecutionException
                        try {
                            multiStageCoprocessor.stop();
                        } catch (Throwable t) {
                            logger.debug("multi processor rejected:", t);
                        }
                    }

                    if (running) {
                        // sleep一段时间再进行重试
                        try {
                            Thread.sleep(10000 + RandomUtils.nextInt(10000));
                        } catch (InterruptedException e) {
                        }
                    }
                }
                MDC.remove("destination");
            }
        });

        parseThread.setUncaughtExceptionHandler(handler);
        parseThread.setName(String.format("destination = %s , address = %s , EventParser",
            destination,
            runningInfo == null ? null : runningInfo.getAddress()));
        parseThread.start();
    }